You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by jo...@apache.org on 2019/04/05 21:49:47 UTC

[incubator-druid] branch master updated: refactor lookups to be more chill to router (#7222)

This is an automated email from the ASF dual-hosted git repository.

jonwei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 76b4a5c  refactor lookups to be more chill to router (#7222)
76b4a5c is described below

commit 76b4a5c62e47775715fb619bd4f19da5db1b5b6a
Author: Clint Wylie <cj...@gmail.com>
AuthorDate: Fri Apr 5 14:49:41 2019 -0700

    refactor lookups to be more chill to router (#7222)
    
    * refactor lookups to be more chill to router
    
    * remove accidental change
    
    * fix and combine LookupIntrospectionResourceTest
    
    * fix inspection
    
    * rename RouterLookupModule to LookupSerdeModule and RouterLookupExtractorFactoryContainerProvider to NoopLookupExtractorFactoryContainerProvider
    
    * make comment generic
    
    * use ConfigResourceFilter instead of StateResourceFilter
    
    * fix indentation
    
    * unused import
    
    * another unused import
    
    * refactor some stuff into processing module, split up LookupModule.java classes into their own files
---
 .../bloom/sql/BloomFilterSqlAggregatorTest.java    |   4 +-
 .../query/filter/sql/BloomDimFilterSqlTest.java    |   4 +-
 integration-tests/docker/broker.conf               |  11 +-
 integration-tests/docker/coordinator.conf          |  10 +-
 integration-tests/docker/historical.conf           |   6 +-
 integration-tests/docker/middlemanager.conf        |   8 +-
 integration-tests/docker/overlord.conf             |   7 +-
 integration-tests/docker/router.conf               |   7 +-
 integration-tests/docker/wiki-simple-lookup.json   |   1 +
 integration-tests/pom.xml                          |   5 +
 integration-tests/run_cluster.sh                   |   1 +
 .../clients/CoordinatorResourceTestClient.java     |  88 ++++++++++
 .../druid/tests/query/ITWikipediaQueryTest.java    |  22 ++-
 .../test/resources/queries/wiki-lookup-config.json |  19 ++
 .../queries/wikipedia_editstream_queries.json      |  51 ++++++
 .../druid/query/dimension/LookupDimensionSpec.java |  14 +-
 .../druid/query/expression/LookupExprMacro.java    |  10 +-
 .../LookupExtractorFactoryContainerProvider.java   |  32 ++++
 .../query/lookup/RegisteredLookupExtractionFn.java |   4 +-
 .../query/lookup/LookupIntrospectionResource.java  |  13 +-
 .../lookup/LookupListeningAnnouncerConfig.java     |  69 ++++++++
 .../query/lookup/LookupListeningResource.java      | 131 ++++++++++++++
 .../apache/druid/query/lookup/LookupModule.java    | 180 +------------------
 .../query/lookup/LookupReferencesManager.java      |  31 ++--
 .../lookup/LookupResourceListenerAnnouncer.java    |  46 +++++
 .../druid/query/lookup/LookupSerdeModule.java      |  73 ++++++++
 .../lookup/cache/LookupCoordinatorManager.java     |   5 +-
 .../query/dimension/LookupDimensionSpecTest.java   |  58 +++---
 .../LookupEnabledTestExprMacroTable.java           |  21 ++-
 .../LookupIntrospectionResourceImplTest.java       | 125 -------------
 .../lookup/LookupIntrospectionResourceTest.java    | 195 ++++++++++++---------
 .../lookup/RegisteredLookupExtractionFnTest.java   |  12 +-
 .../apache/druid/server/WebserverTestUtils.java    | 128 ++++++++++++++
 .../main/java/org/apache/druid/cli/CliRouter.java  |  58 +++---
 .../druid/cli/RouterJettyServerInitializer.java    |   4 +-
 .../builtin/QueryLookupOperatorConversion.java     |  10 +-
 .../java/org/apache/druid/sql/guice/SqlModule.java |   2 +-
 .../druid/sql/calcite/util/CalciteTests.java       |   6 +-
 38 files changed, 930 insertions(+), 541 deletions(-)

diff --git a/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/sql/BloomFilterSqlAggregatorTest.java b/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/sql/BloomFilterSqlAggregatorTest.java
index ceb8c42..ad4f90a 100644
--- a/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/sql/BloomFilterSqlAggregatorTest.java
+++ b/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/sql/BloomFilterSqlAggregatorTest.java
@@ -54,7 +54,7 @@ import org.apache.druid.query.expression.LookupEnabledTestExprMacroTable;
 import org.apache.druid.query.expression.TestExprMacroTable;
 import org.apache.druid.query.extraction.SubstringDimExtractionFn;
 import org.apache.druid.query.filter.BloomKFilter;
-import org.apache.druid.query.lookup.LookupReferencesManager;
+import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider;
 import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
 import org.apache.druid.segment.IndexBuilder;
 import org.apache.druid.segment.QueryableIndex;
@@ -99,7 +99,7 @@ public class BloomFilterSqlAggregatorTest
   private static final Injector injector = Guice.createInjector(
       binder -> {
         binder.bind(Key.get(ObjectMapper.class, Json.class)).toInstance(TestHelper.makeJsonMapper());
-        binder.bind(LookupReferencesManager.class).toInstance(
+        binder.bind(LookupExtractorFactoryContainerProvider.class).toInstance(
             LookupEnabledTestExprMacroTable.createTestLookupReferencesManager(
                 ImmutableMap.of(
                     "a", "xa",
diff --git a/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/filter/sql/BloomDimFilterSqlTest.java b/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/filter/sql/BloomDimFilterSqlTest.java
index c8243dc..be32f01 100644
--- a/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/filter/sql/BloomDimFilterSqlTest.java
+++ b/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/filter/sql/BloomDimFilterSqlTest.java
@@ -44,7 +44,7 @@ import org.apache.druid.query.filter.BloomKFilter;
 import org.apache.druid.query.filter.BloomKFilterHolder;
 import org.apache.druid.query.filter.ExpressionDimFilter;
 import org.apache.druid.query.filter.OrDimFilter;
-import org.apache.druid.query.lookup.LookupReferencesManager;
+import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider;
 import org.apache.druid.segment.TestHelper;
 import org.apache.druid.segment.column.ValueType;
 import org.apache.druid.server.security.AuthenticationResult;
@@ -67,7 +67,7 @@ public class BloomDimFilterSqlTest extends BaseCalciteQueryTest
   private static final Injector injector = Guice.createInjector(
       binder -> {
         binder.bind(Key.get(ObjectMapper.class, Json.class)).toInstance(TestHelper.makeJsonMapper());
-        binder.bind(LookupReferencesManager.class).toInstance(
+        binder.bind(LookupExtractorFactoryContainerProvider.class).toInstance(
             LookupEnabledTestExprMacroTable.createTestLookupReferencesManager(
                 ImmutableMap.of(
                     "a", "xa",
diff --git a/integration-tests/docker/broker.conf b/integration-tests/docker/broker.conf
index 39b8a7f..f075558 100644
--- a/integration-tests/docker/broker.conf
+++ b/integration-tests/docker/broker.conf
@@ -5,21 +5,19 @@ command=java
   -Xms512m
   -XX:NewSize=256m
   -XX:MaxNewSize=256m
-  -XX:+UseConcMarkSweepGC
-  -XX:+PrintGCDetails
-  -XX:+PrintGCTimeStamps
+  -XX:+UseG1GC
   -Duser.timezone=UTC
   -Dfile.encoding=UTF-8
   -Ddruid.host=%(ENV_HOST_IP)s
   -Ddruid.zk.service.host=druid-zookeeper-kafka
   -Ddruid.processing.buffer.sizeBytes=25000000
-  -Ddruid.server.http.numThreads=100
+  -Ddruid.server.http.numThreads=40
   -Ddruid.processing.numThreads=1
-  -Ddruid.broker.http.numConnections=30
+  -Ddruid.broker.http.numConnections=20
   -Ddruid.broker.http.readTimeout=PT5M
   -Ddruid.broker.cache.useCache=true
   -Ddruid.broker.cache.populateCache=true
-  -Ddruid.cache.type=local
+  -Ddruid.lookup.namespace.cache.type=onHeap
   -Ddruid.cache.sizeInBytes=40000000
   -Ddruid.lookup.numLookupLoadingThreads=1
   -Ddruid.auth.authenticatorChain="[\"basic\"]"
@@ -63,3 +61,4 @@ redirect_stderr=true
 autorestart=false
 priority=100
 stdout_logfile=/shared/logs/broker.log
+environment=AWS_REGION=us-east-1
diff --git a/integration-tests/docker/coordinator.conf b/integration-tests/docker/coordinator.conf
index e05c924..e92f2d5 100644
--- a/integration-tests/docker/coordinator.conf
+++ b/integration-tests/docker/coordinator.conf
@@ -3,13 +3,11 @@ command=java
   -server
   -Xmx128m
   -Xms128m
-  -XX:+UseConcMarkSweepGC
-  -XX:+PrintGCDetails
-  -XX:+PrintGCTimeStamps
+  -XX:+UseG1GC
   -Duser.timezone=UTC
   -Dfile.encoding=UTF-8
   -Ddruid.host=%(ENV_HOST_IP)s
-  -Ddruid.server.http.numThreads=100
+  -Ddruid.server.http.numThreads=20
   -Ddruid.metadata.storage.type=mysql
   -Ddruid.metadata.storage.connector.connectURI=jdbc:mysql://druid-metadata-storage/druid
   -Ddruid.metadata.storage.connector.user=druid
@@ -17,6 +15,9 @@ command=java
   -Ddruid.zk.service.host=druid-zookeeper-kafka
   -Ddruid.coordinator.startDelay=PT5S
   -Ddruid.lookup.numLookupLoadingThreads=1
+  -Ddruid.manager.lookups.hostUpdateTimeout=PT30S
+  -Ddruid.manager.lookups.period=10000
+  -Ddruid.manager.lookups.threadPoolSize=2
   -Ddruid.auth.authenticatorChain="[\"basic\"]"
   -Ddruid.auth.authenticator.basic.type=basic
   -Ddruid.auth.authenticator.basic.initialAdminPassword=priest
@@ -57,3 +58,4 @@ redirect_stderr=true
 priority=100
 autorestart=false
 stdout_logfile=/shared/logs/coordinator.log
+environment=AWS_REGION=us-east-1
diff --git a/integration-tests/docker/historical.conf b/integration-tests/docker/historical.conf
index 335b8c3..964e755 100644
--- a/integration-tests/docker/historical.conf
+++ b/integration-tests/docker/historical.conf
@@ -5,9 +5,7 @@ command=java
   -Xms512m
   -XX:NewSize=256m
   -XX:MaxNewSize=256m
-  -XX:+UseConcMarkSweepGC
-  -XX:+PrintGCDetails
-  -XX:+PrintGCTimeStamps
+  -XX:+UseG1GC
   -Duser.timezone=UTC
   -Dfile.encoding=UTF-8
   -Ddruid.host=%(ENV_HOST_IP)s
@@ -16,7 +14,7 @@ command=java
   -Ddruid.s3.secretKey=OBaLISDFjKLajSTrJ53JoTtzTZLjPlRePcwa+Pjv
   -Ddruid.processing.buffer.sizeBytes=25000000
   -Ddruid.processing.numThreads=2
-  -Ddruid.server.http.numThreads=100
+  -Ddruid.server.http.numThreads=20
   -Ddruid.segmentCache.locations="[{\"path\":\"/shared/druid/indexCache\",\"maxSize\":5000000000}]"
   -Ddruid.server.maxSize=5000000000
   -Ddruid.lookup.numLookupLoadingThreads=1
diff --git a/integration-tests/docker/middlemanager.conf b/integration-tests/docker/middlemanager.conf
index 40adf19..35cfbb1 100644
--- a/integration-tests/docker/middlemanager.conf
+++ b/integration-tests/docker/middlemanager.conf
@@ -3,9 +3,7 @@ command=java
   -server
   -Xmx64m
   -Xms64m
-  -XX:+UseConcMarkSweepGC
-  -XX:+PrintGCDetails
-  -XX:+PrintGCTimeStamps
+  -XX:+UseG1GC
   -Duser.timezone=UTC
   -Dfile.encoding=UTF-8
   -Ddruid.host=%(ENV_HOST_IP)s
@@ -14,10 +12,10 @@ command=java
   -Ddruid.worker.capacity=3
   -Ddruid.indexer.logs.directory=/shared/tasklogs
   -Ddruid.storage.storageDirectory=/shared/storage
-  -Ddruid.indexer.runner.javaOpts="-server -Xmx256m -Xms256m -XX:NewSize=128m -XX:MaxNewSize=128m -XX:+UseConcMarkSweepGC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -Duser.timezone=UTC -Dfile.encoding=UTF-8 -Dlog4j.configurationFile=/shared/docker/lib/log4j2.xml"
+  -Ddruid.indexer.runner.javaOpts="-server -Xmx256m -Xms256m -XX:NewSize=128m -XX:MaxNewSize=128m -XX:+UseG1GC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -Duser.timezone=UTC -Dfile.encoding=UTF-8 -Dlog4j.configurationFile=/shared/docker/lib/log4j2.xml"
   -Ddruid.indexer.fork.property.druid.processing.buffer.sizeBytes=25000000
   -Ddruid.indexer.fork.property.druid.processing.numThreads=1
-  -Ddruid.indexer.fork.server.http.numThreads=100
+  -Ddruid.indexer.fork.server.http.numThreads=20
   -Ddruid.s3.accessKey=AKIAJI7DG7CDECGBQ6NA
   -Ddruid.s3.secretKey=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
   -Ddruid.worker.ip=%(ENV_HOST_IP)s
diff --git a/integration-tests/docker/overlord.conf b/integration-tests/docker/overlord.conf
index 77514b1..5af40d6 100644
--- a/integration-tests/docker/overlord.conf
+++ b/integration-tests/docker/overlord.conf
@@ -3,13 +3,11 @@ command=java
   -server
   -Xmx128m
   -Xms128m
-  -XX:+UseConcMarkSweepGC
-  -XX:+PrintGCDetails
-  -XX:+PrintGCTimeStamps
+  -XX:+UseG1GC
   -Duser.timezone=UTC
   -Dfile.encoding=UTF-8
   -Ddruid.host=%(ENV_HOST_IP)s
-  -Ddruid.server.http.numThreads=100
+  -Ddruid.server.http.numThreads=20
   -Ddruid.metadata.storage.type=mysql
   -Ddruid.metadata.storage.connector.connectURI=jdbc:mysql://druid-metadata-storage/druid
   -Ddruid.metadata.storage.connector.user=druid
@@ -58,3 +56,4 @@ redirect_stderr=true
 priority=100
 autorestart=false
 stdout_logfile=/shared/logs/overlord.log
+environment=AWS_REGION=us-east-1
\ No newline at end of file
diff --git a/integration-tests/docker/router.conf b/integration-tests/docker/router.conf
index 29bb6b5..98a7a97 100644
--- a/integration-tests/docker/router.conf
+++ b/integration-tests/docker/router.conf
@@ -2,14 +2,12 @@
 command=java
   -server
   -Xmx128m
-  -XX:+UseConcMarkSweepGC
-  -XX:+PrintGCDetails
-  -XX:+PrintGCTimeStamps
+  -XX:+UseG1GC
   -Duser.timezone=UTC
   -Dfile.encoding=UTF-8
   -Ddruid.host=%(ENV_HOST_IP)s
   -Ddruid.zk.service.host=druid-zookeeper-kafka
-  -Ddruid.server.http.numThreads=100
+  -Ddruid.server.http.numThreads=20
   -Ddruid.lookup.numLookupLoadingThreads=1
   -Ddruid.auth.authenticatorChain="[\"basic\"]"
   -Ddruid.auth.authenticator.basic.type=basic
@@ -52,3 +50,4 @@ redirect_stderr=true
 priority=100
 autorestart=false
 stdout_logfile=/shared/logs/router.log
+environment=AWS_REGION=us-east-1
diff --git a/integration-tests/docker/wiki-simple-lookup.json b/integration-tests/docker/wiki-simple-lookup.json
new file mode 100644
index 0000000..a3de9b1
--- /dev/null
+++ b/integration-tests/docker/wiki-simple-lookup.json
@@ -0,0 +1 @@
+{"Wikipedia:Vandalismusmeldung":"lookup!"}
\ No newline at end of file
diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml
index 6159021..91209ce 100644
--- a/integration-tests/pom.xml
+++ b/integration-tests/pom.xml
@@ -79,6 +79,11 @@
         </dependency>
         <dependency>
             <groupId>org.apache.druid.extensions</groupId>
+            <artifactId>druid-lookups-cached-global</artifactId>
+            <version>${project.parent.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.druid.extensions</groupId>
             <artifactId>simple-client-sslcontext</artifactId>
             <version>${project.parent.version}</version>
         </dependency>
diff --git a/integration-tests/run_cluster.sh b/integration-tests/run_cluster.sh
index 49d046a..25078eb 100755
--- a/integration-tests/run_cluster.sh
+++ b/integration-tests/run_cluster.sh
@@ -56,6 +56,7 @@ cp target/druid-integration-tests*.jar $SHARED_DIR/docker/lib
 # one of the integration tests needs the wikiticker sample data
 mkdir -p $SHARED_DIR/wikiticker-it
 cp ../examples/quickstart/tutorial/wikiticker-2015-09-12-sampled.json.gz $SHARED_DIR/wikiticker-it/wikiticker-2015-09-12-sampled.json.gz
+cp docker/wiki-simple-lookup.json $SHARED_DIR/wikiticker-it/wiki-simple-lookup.json
 
 docker network create --subnet=172.172.172.0/24 druid-it-net
 
diff --git a/integration-tests/src/main/java/org/apache/druid/testing/clients/CoordinatorResourceTestClient.java b/integration-tests/src/main/java/org/apache/druid/testing/clients/CoordinatorResourceTestClient.java
index 6974b44..babb9e3 100644
--- a/integration-tests/src/main/java/org/apache/druid/testing/clients/CoordinatorResourceTestClient.java
+++ b/integration-tests/src/main/java/org/apache/druid/testing/clients/CoordinatorResourceTestClient.java
@@ -21,6 +21,8 @@ package org.apache.druid.testing.clients;
 
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.net.HostAndPort;
 import com.google.inject.Inject;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.RE;
@@ -29,6 +31,8 @@ import org.apache.druid.java.util.http.client.HttpClient;
 import org.apache.druid.java.util.http.client.Request;
 import org.apache.druid.java.util.http.client.response.StatusResponseHandler;
 import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
+import org.apache.druid.query.lookup.LookupsState;
+import org.apache.druid.server.lookup.cache.LookupExtractorFactoryMapContainer;
 import org.apache.druid.testing.IntegrationTestingConfig;
 import org.apache.druid.testing.guice.TestClient;
 import org.apache.druid.timeline.DataSegment;
@@ -218,6 +222,90 @@ public class CoordinatorResourceTestClient
     }
   }
 
+  public Map<String, Object> initializeLookups(String filePath) throws Exception
+  {
+    String url = StringUtils.format("%slookups/config", getCoordinatorURL());
+    StatusResponseHolder response = httpClient.go(
+        new Request(HttpMethod.POST, new URL(url)).setContent(
+            "application/json",
+            jsonMapper.writeValueAsBytes(ImmutableMap.of())
+        ), responseHandler
+    ).get();
+
+    if (!response.getStatus().equals(HttpResponseStatus.ACCEPTED)) {
+      throw new ISE(
+          "Error while querying[%s] status[%s] content[%s]",
+          url,
+          response.getStatus(),
+          response.getContent()
+      );
+    }
+
+    Map<String, Object> results = jsonMapper.readValue(
+        response.getContent(),
+        new TypeReference<Map<String, Object>>(){}
+    );
+
+    StatusResponseHolder response2 = httpClient.go(
+        new Request(HttpMethod.POST, new URL(url)).setContent(
+            "application/json",
+            jsonMapper.writeValueAsBytes(jsonMapper.readValue(CoordinatorResourceTestClient.class.getResourceAsStream(filePath), new TypeReference<Map<Object, Object>>(){}))
+        ), responseHandler
+    ).get();
+
+    if (!response2.getStatus().equals(HttpResponseStatus.ACCEPTED)) {
+      throw new ISE(
+          "Error while querying[%s] status[%s] content[%s]",
+          url,
+          response2.getStatus(),
+          response2.getContent()
+      );
+    }
+
+    Map<String, Object> results2 = jsonMapper.readValue(
+        response.getContent(),
+        new TypeReference<Map<String, Object>>()
+        {
+        }
+    );
+
+    return results2;
+  }
+
+  private Map<String, Map<HostAndPort, LookupsState<LookupExtractorFactoryMapContainer>>> getLookupLoadStatus()
+  {
+    String url = StringUtils.format("%slookups/nodeStatus", getCoordinatorURL());
+
+    Map<String, Map<HostAndPort, LookupsState<LookupExtractorFactoryMapContainer>>> status;
+    try {
+      StatusResponseHolder response = makeRequest(HttpMethod.GET, url);
+
+      status = jsonMapper.readValue(
+          response.getContent(), new TypeReference<Map<String, Map<HostAndPort, LookupsState<LookupExtractorFactoryMapContainer>>>>()
+          {
+          }
+      );
+    }
+    catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+    return status;
+  }
+
+  public boolean areLookupsLoaded(String lookup)
+  {
+    final Map<String, Map<HostAndPort, LookupsState<LookupExtractorFactoryMapContainer>>> status = getLookupLoadStatus();
+
+    final Map<HostAndPort, LookupsState<LookupExtractorFactoryMapContainer>> defaultTier = status.get("__default");
+
+    boolean isLoaded = true;
+    for (Map.Entry<HostAndPort, LookupsState<LookupExtractorFactoryMapContainer>> host : defaultTier.entrySet()) {
+      isLoaded &= host.getValue().getCurrent().containsKey(lookup);
+    }
+
+    return isLoaded;
+  }
+
   private StatusResponseHolder makeRequest(HttpMethod method, String url)
   {
     try {
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/query/ITWikipediaQueryTest.java b/integration-tests/src/test/java/org/apache/druid/tests/query/ITWikipediaQueryTest.java
index 637cf6c..fa1658c 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/query/ITWikipediaQueryTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/query/ITWikipediaQueryTest.java
@@ -28,31 +28,30 @@ import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Guice;
 import org.testng.annotations.Test;
 
-import java.util.concurrent.Callable;
-
 @Guice(moduleFactory = DruidTestModuleFactory.class)
 public class ITWikipediaQueryTest
 {
   private static final String WIKIPEDIA_DATA_SOURCE = "wikipedia_editstream";
+  private static final String WIKI_LOOKUP = "wiki-simple";
   private static final String WIKIPEDIA_QUERIES_RESOURCE = "/queries/wikipedia_editstream_queries.json";
+  private static final String WIKIPEDIA_LOOKUP_RESOURCE = "/queries/wiki-lookup-config.json";
+
   @Inject
   private CoordinatorResourceTestClient coordinatorClient;
   @Inject
   private TestQueryHelper queryHelper;
 
   @BeforeMethod
-  public void before()
+  public void before() throws Exception
   {
+
     // ensure that wikipedia segments are loaded completely
     RetryUtil.retryUntilTrue(
-        new Callable<Boolean>()
-        {
-          @Override
-          public Boolean call()
-          {
-            return coordinatorClient.areSegmentsLoaded(WIKIPEDIA_DATA_SOURCE);
-          }
-        }, "wikipedia segment load"
+        () -> coordinatorClient.areSegmentsLoaded(WIKIPEDIA_DATA_SOURCE), "wikipedia segment load"
+    );
+    coordinatorClient.initializeLookups(WIKIPEDIA_LOOKUP_RESOURCE);
+    RetryUtil.retryUntilTrue(
+        () -> coordinatorClient.areLookupsLoaded(WIKI_LOOKUP), "wikipedia lookup load"
     );
   }
 
@@ -61,5 +60,4 @@ public class ITWikipediaQueryTest
   {
     queryHelper.testQueriesFromFile(WIKIPEDIA_QUERIES_RESOURCE, 2);
   }
-
 }
diff --git a/integration-tests/src/test/resources/queries/wiki-lookup-config.json b/integration-tests/src/test/resources/queries/wiki-lookup-config.json
new file mode 100644
index 0000000..1b1969f
--- /dev/null
+++ b/integration-tests/src/test/resources/queries/wiki-lookup-config.json
@@ -0,0 +1,19 @@
+{
+  "__default": {
+    "wiki-simple": {
+      "version": "v1",
+      "lookupExtractorFactory": {
+        "type": "cachedNamespace",
+        "extractionNamespace": {
+          "type": "uri",
+          "uri": "file:/shared/wikiticker-it/wiki-simple-lookup.json",
+          "namespaceParseSpec": {
+            "format": "simpleJson"
+          },
+          "pollPeriod": "PT10S"
+        },
+        "firstCacheTimeout": 0
+      }
+    }
+  }
+}
\ No newline at end of file
diff --git a/integration-tests/src/test/resources/queries/wikipedia_editstream_queries.json b/integration-tests/src/test/resources/queries/wikipedia_editstream_queries.json
index 3cbdf4f..846fcd2 100644
--- a/integration-tests/src/test/resources/queries/wikipedia_editstream_queries.json
+++ b/integration-tests/src/test/resources/queries/wikipedia_editstream_queries.json
@@ -1248,5 +1248,56 @@
                 "rollup":null
             }
         ]
+    },
+    {
+        "description": "topN, 1 agg, lookups",
+        "query": {
+            "queryType": "topN",
+            "dataSource": "wikipedia_editstream",
+            "intervals": ["2013-01-01T00:00:00.000/2013-01-08T00:00:00.000"],
+            "granularity": "all",
+            "aggregations": [
+                {
+                    "type": "count",
+                    "name": "rows"
+                }
+            ],
+            "dimension": {
+                "type" : "extraction",
+                "dimension" : "page",
+                "outputName" :  "lookupPage",
+                "extractionFn" : {
+                    "type":"registeredLookup",
+                    "lookup":"wiki-simple",
+                    "retainMissingValue":true
+                }
+            },
+            "metric": "rows",
+            "threshold": 3,
+            "context": {
+                "useCache": "true",
+                "populateCache": "true",
+                "timeout": 360000
+            }
+        },
+        "expectedResults": [
+            {
+                "timestamp": "2013-01-01T00:00:00.000Z",
+                "result": [
+                    {
+                        "lookupPage": "lookup!",
+                        "rows": 991
+                    },
+                    {
+                        "lookupPage": "Wikipedia:Administrators'_noticeboard/Incidents",
+                        "rows": 990
+                    },
+                    {
+                        "lookupPage": "Wikipedia:Administrator_intervention_against_vandalism",
+                        "rows": 800
+                    }
+                ]
+            }
+        ]
     }
 ]
diff --git a/server/src/main/java/org/apache/druid/query/dimension/LookupDimensionSpec.java b/processing/src/main/java/org/apache/druid/query/dimension/LookupDimensionSpec.java
similarity index 92%
rename from server/src/main/java/org/apache/druid/query/dimension/LookupDimensionSpec.java
rename to processing/src/main/java/org/apache/druid/query/dimension/LookupDimensionSpec.java
index 2bfeaf0..d245efb 100644
--- a/server/src/main/java/org/apache/druid/query/dimension/LookupDimensionSpec.java
+++ b/processing/src/main/java/org/apache/druid/query/dimension/LookupDimensionSpec.java
@@ -30,7 +30,7 @@ import org.apache.druid.query.extraction.ExtractionFn;
 import org.apache.druid.query.filter.DimFilterUtils;
 import org.apache.druid.query.lookup.LookupExtractionFn;
 import org.apache.druid.query.lookup.LookupExtractor;
-import org.apache.druid.query.lookup.LookupReferencesManager;
+import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider;
 import org.apache.druid.segment.DimensionSelector;
 import org.apache.druid.segment.column.ValueType;
 
@@ -63,7 +63,7 @@ public class LookupDimensionSpec implements DimensionSpec
   @JsonProperty
   private final boolean optimize;
 
-  private final LookupReferencesManager lookupReferencesManager;
+  private final LookupExtractorFactoryContainerProvider lookupExtractorFactoryContainerProvider;
 
   @JsonCreator
   public LookupDimensionSpec(
@@ -73,8 +73,8 @@ public class LookupDimensionSpec implements DimensionSpec
       @JsonProperty("retainMissingValue") boolean retainMissingValue,
       @JsonProperty("replaceMissingValueWith") String replaceMissingValueWith,
       @JsonProperty("name") String name,
-      @JacksonInject LookupReferencesManager lookupReferencesManager,
-      @JsonProperty("optimize") Boolean optimize
+      @JsonProperty("optimize") Boolean optimize,
+      @JacksonInject LookupExtractorFactoryContainerProvider lookupExtractorFactoryContainerProvider
   )
   {
     this.retainMissingValue = retainMissingValue;
@@ -82,7 +82,7 @@ public class LookupDimensionSpec implements DimensionSpec
     this.replaceMissingValueWith = NullHandling.emptyToNullIfNeeded(replaceMissingValueWith);
     this.dimension = Preconditions.checkNotNull(dimension, "dimension can not be Null");
     this.outputName = Preconditions.checkNotNull(outputName, "outputName can not be Null");
-    this.lookupReferencesManager = lookupReferencesManager;
+    this.lookupExtractorFactoryContainerProvider = lookupExtractorFactoryContainerProvider;
     this.name = name;
     this.lookup = lookup;
     Preconditions.checkArgument(
@@ -92,7 +92,7 @@ public class LookupDimensionSpec implements DimensionSpec
 
     if (!Strings.isNullOrEmpty(name)) {
       Preconditions.checkNotNull(
-          this.lookupReferencesManager,
+          this.lookupExtractorFactoryContainerProvider,
           "The system is not configured to allow for lookups, please read about configuring a lookup manager in the docs"
       );
     }
@@ -139,7 +139,7 @@ public class LookupDimensionSpec implements DimensionSpec
     final LookupExtractor lookupExtractor = Strings.isNullOrEmpty(name)
                                             ? this.lookup
                                             : Preconditions.checkNotNull(
-                                                lookupReferencesManager.get(name),
+                                                lookupExtractorFactoryContainerProvider.get(name),
                                                 "Lookup [%s] not found",
                                                 name
                                             ).getLookupExtractorFactory().get();
diff --git a/server/src/main/java/org/apache/druid/query/expression/LookupExprMacro.java b/processing/src/main/java/org/apache/druid/query/expression/LookupExprMacro.java
similarity index 85%
rename from server/src/main/java/org/apache/druid/query/expression/LookupExprMacro.java
rename to processing/src/main/java/org/apache/druid/query/expression/LookupExprMacro.java
index c6c912f..3a5e40e 100644
--- a/server/src/main/java/org/apache/druid/query/expression/LookupExprMacro.java
+++ b/processing/src/main/java/org/apache/druid/query/expression/LookupExprMacro.java
@@ -25,7 +25,7 @@ import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.math.expr.Expr;
 import org.apache.druid.math.expr.ExprEval;
 import org.apache.druid.math.expr.ExprMacroTable;
-import org.apache.druid.query.lookup.LookupReferencesManager;
+import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider;
 import org.apache.druid.query.lookup.RegisteredLookupExtractionFn;
 
 import javax.annotation.Nonnull;
@@ -33,12 +33,12 @@ import java.util.List;
 
 public class LookupExprMacro implements ExprMacroTable.ExprMacro
 {
-  private final LookupReferencesManager lookupReferencesManager;
+  private final LookupExtractorFactoryContainerProvider lookupExtractorFactoryContainerProvider;
 
   @Inject
-  public LookupExprMacro(final LookupReferencesManager lookupReferencesManager)
+  public LookupExprMacro(final LookupExtractorFactoryContainerProvider lookupExtractorFactoryContainerProvider)
   {
-    this.lookupReferencesManager = lookupReferencesManager;
+    this.lookupExtractorFactoryContainerProvider = lookupExtractorFactoryContainerProvider;
   }
 
   @Override
@@ -63,7 +63,7 @@ public class LookupExprMacro implements ExprMacroTable.ExprMacro
 
     final String lookupName = lookupExpr.getLiteralValue().toString();
     final RegisteredLookupExtractionFn extractionFn = new RegisteredLookupExtractionFn(
-        lookupReferencesManager,
+        lookupExtractorFactoryContainerProvider,
         lookupName,
         false,
         null,
diff --git a/processing/src/main/java/org/apache/druid/query/lookup/LookupExtractorFactoryContainerProvider.java b/processing/src/main/java/org/apache/druid/query/lookup/LookupExtractorFactoryContainerProvider.java
new file mode 100644
index 0000000..a9fada0
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/query/lookup/LookupExtractorFactoryContainerProvider.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.lookup;
+
+import javax.annotation.Nullable;
+
+/**
+ * Provides {@link LookupExtractorFactoryContainer} to query and indexing time dimension transformations.
+ */
+@FunctionalInterface
+public interface LookupExtractorFactoryContainerProvider
+{
+  @Nullable
+  LookupExtractorFactoryContainer get(String lookupName);
+}
diff --git a/server/src/main/java/org/apache/druid/query/lookup/RegisteredLookupExtractionFn.java b/processing/src/main/java/org/apache/druid/query/lookup/RegisteredLookupExtractionFn.java
similarity index 97%
rename from server/src/main/java/org/apache/druid/query/lookup/RegisteredLookupExtractionFn.java
rename to processing/src/main/java/org/apache/druid/query/lookup/RegisteredLookupExtractionFn.java
index da2df8e..6595d6c 100644
--- a/server/src/main/java/org/apache/druid/query/lookup/RegisteredLookupExtractionFn.java
+++ b/processing/src/main/java/org/apache/druid/query/lookup/RegisteredLookupExtractionFn.java
@@ -35,7 +35,7 @@ public class RegisteredLookupExtractionFn implements ExtractionFn
   // Protected for moving to not-null by `delegateLock`
   private volatile LookupExtractionFn delegate = null;
   private final Object delegateLock = new Object();
-  private final LookupReferencesManager manager;
+  private final LookupExtractorFactoryContainerProvider manager;
   private final String lookup;
   private final boolean retainMissingValue;
   private final String replaceMissingValueWith;
@@ -44,7 +44,7 @@ public class RegisteredLookupExtractionFn implements ExtractionFn
 
   @JsonCreator
   public RegisteredLookupExtractionFn(
-      @JacksonInject LookupReferencesManager manager,
+      @JacksonInject LookupExtractorFactoryContainerProvider manager,
       @JsonProperty("lookup") String lookup,
       @JsonProperty("retainMissingValue") final boolean retainMissingValue,
       @Nullable @JsonProperty("replaceMissingValueWith") final String replaceMissingValueWith,
diff --git a/server/src/main/java/org/apache/druid/query/lookup/LookupIntrospectionResource.java b/server/src/main/java/org/apache/druid/query/lookup/LookupIntrospectionResource.java
index 03c3e97..092ab6c 100644
--- a/server/src/main/java/org/apache/druid/query/lookup/LookupIntrospectionResource.java
+++ b/server/src/main/java/org/apache/druid/query/lookup/LookupIntrospectionResource.java
@@ -20,7 +20,9 @@
 package org.apache.druid.query.lookup;
 
 import com.google.inject.Inject;
+import com.sun.jersey.spi.container.ResourceFilters;
 import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.server.http.security.ConfigResourceFilter;
 
 import javax.ws.rs.Path;
 import javax.ws.rs.PathParam;
@@ -28,22 +30,25 @@ import javax.ws.rs.core.Context;
 import javax.ws.rs.core.Response;
 
 @Path("/druid/v1/lookups/introspect")
+@ResourceFilters(ConfigResourceFilter.class)
 public class LookupIntrospectionResource
 {
   private static final Logger LOGGER = new Logger(LookupIntrospectionResource.class);
 
-  private final LookupReferencesManager lookupReferencesManager;
+  private final LookupExtractorFactoryContainerProvider lookupExtractorFactoryContainerProvider;
 
   @Inject
-  public LookupIntrospectionResource(@Context LookupReferencesManager lookupReferencesManager)
+  public LookupIntrospectionResource(
+      @Context LookupExtractorFactoryContainerProvider lookupExtractorFactoryContainerProvider
+  )
   {
-    this.lookupReferencesManager = lookupReferencesManager;
+    this.lookupExtractorFactoryContainerProvider = lookupExtractorFactoryContainerProvider;
   }
 
   @Path("/{lookupId}")
   public Object introspectLookup(@PathParam("lookupId") final String lookupId)
   {
-    final LookupExtractorFactoryContainer container = lookupReferencesManager.get(lookupId);
+    final LookupExtractorFactoryContainer container = lookupExtractorFactoryContainerProvider.get(lookupId);
 
     if (container == null) {
       LOGGER.error("trying to introspect non existing lookup [%s]", lookupId);
diff --git a/server/src/main/java/org/apache/druid/query/lookup/LookupListeningAnnouncerConfig.java b/server/src/main/java/org/apache/druid/query/lookup/LookupListeningAnnouncerConfig.java
new file mode 100644
index 0000000..73b69f7
--- /dev/null
+++ b/server/src/main/java/org/apache/druid/query/lookup/LookupListeningAnnouncerConfig.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.lookup;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.server.initialization.ZkPathsConfig;
+import org.apache.druid.server.listener.announcer.ListeningAnnouncerConfig;
+import org.apache.druid.server.metrics.DataSourceTaskIdHolder;
+
+class LookupListeningAnnouncerConfig extends ListeningAnnouncerConfig
+{
+  public static final String DEFAULT_TIER = "__default";
+  private final DataSourceTaskIdHolder dataSourceTaskIdHolder;
+  @JsonProperty("lookupTier")
+  private String lookupTier = null;
+  @JsonProperty("lookupTierIsDatasource")
+  private boolean lookupTierIsDatasource = false;
+
+  @JsonCreator
+  public LookupListeningAnnouncerConfig(
+      @JacksonInject ZkPathsConfig zkPathsConfig,
+      @JacksonInject DataSourceTaskIdHolder dataSourceTaskIdHolder
+  )
+  {
+    super(zkPathsConfig);
+    this.dataSourceTaskIdHolder = dataSourceTaskIdHolder;
+  }
+
+  public String getLookupTier()
+  {
+    Preconditions.checkArgument(
+        !(lookupTierIsDatasource && null != lookupTier),
+        "Cannot specify both `lookupTier` and `lookupTierIsDatasource`"
+    );
+    final String lookupTier = lookupTierIsDatasource ? dataSourceTaskIdHolder.getDataSource() : this.lookupTier;
+
+    return Preconditions.checkNotNull(
+        lookupTier == null ? DEFAULT_TIER : StringUtils.emptyToNullNonDruidDataString(lookupTier),
+        "Cannot have empty lookup tier from %s",
+        lookupTierIsDatasource ? "bound value" : LookupModule.PROPERTY_BASE
+    );
+  }
+
+  public String getLookupKey()
+  {
+    return LookupModule.getTierListenerPath(getLookupTier());
+  }
+}
diff --git a/server/src/main/java/org/apache/druid/query/lookup/LookupListeningResource.java b/server/src/main/java/org/apache/druid/query/lookup/LookupListeningResource.java
new file mode 100644
index 0000000..e176734
--- /dev/null
+++ b/server/src/main/java/org/apache/druid/query/lookup/LookupListeningResource.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.lookup;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableMap;
+import com.google.inject.Inject;
+import com.sun.jersey.spi.container.ResourceFilters;
+import org.apache.druid.common.utils.ServletResourceUtils;
+import org.apache.druid.guice.annotations.Json;
+import org.apache.druid.guice.annotations.Smile;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.server.http.security.ConfigResourceFilter;
+import org.apache.druid.server.listener.resource.AbstractListenerHandler;
+import org.apache.druid.server.listener.resource.ListenerResource;
+import org.apache.druid.server.lookup.cache.LookupCoordinatorManager;
+
+import javax.ws.rs.Path;
+import javax.ws.rs.core.Response;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
+
+@Path(ListenerResource.BASE_PATH + "/" + LookupCoordinatorManager.LOOKUP_LISTEN_ANNOUNCE_KEY)
+@ResourceFilters(ConfigResourceFilter.class)
+class LookupListeningResource extends ListenerResource
+{
+  private static final Logger LOG = new Logger(LookupListeningResource.class);
+
+  private static final TypeReference<LookupsState<LookupExtractorFactoryContainer>> LOOKUPS_STATE_TYPE_REFERENCE =
+      new TypeReference<LookupsState<LookupExtractorFactoryContainer>>()
+      {
+      };
+
+  @Inject
+  public LookupListeningResource(
+      final @Json ObjectMapper jsonMapper,
+      final @Smile ObjectMapper smileMapper,
+      final LookupReferencesManager manager
+  )
+  {
+    super(
+        jsonMapper,
+        smileMapper,
+        new AbstractListenerHandler<LookupExtractorFactory>(new TypeReference<LookupExtractorFactory>()
+        {
+        })
+        {
+          @Override
+          public Response handleUpdates(InputStream inputStream, ObjectMapper mapper)
+          {
+            final LookupsState<LookupExtractorFactoryContainer> state;
+            try {
+              state = mapper.readValue(inputStream, LOOKUPS_STATE_TYPE_REFERENCE);
+            }
+            catch (final IOException ex) {
+              LOG.debug(ex, "Bad request");
+              return Response.status(Response.Status.BAD_REQUEST)
+                             .entity(ServletResourceUtils.sanitizeException(ex))
+                             .build();
+            }
+
+            try {
+              state.getToLoad().forEach(manager::add);
+              state.getToDrop().forEach(manager::remove);
+
+              return Response.status(Response.Status.ACCEPTED).entity(manager.getAllLookupsState()).build();
+            }
+            catch (Exception e) {
+              LOG.error(e, "Error handling request");
+              return Response.serverError().entity(ServletResourceUtils.sanitizeException(e)).build();
+            }
+          }
+
+          @Override
+          public Object post(final Map<String, LookupExtractorFactory> lookups)
+          {
+            final Map<String, LookupExtractorFactory> failedUpdates = new HashMap<>();
+            for (final String name : lookups.keySet()) {
+
+              final LookupExtractorFactoryContainer factoryContainer = new LookupExtractorFactoryContainer(
+                  null,
+                  lookups.get(name)
+              );
+
+              manager.add(name, factoryContainer);
+            }
+            return ImmutableMap.of("status", "accepted", LookupModule.FAILED_UPDATES_KEY, failedUpdates);
+          }
+
+          @Override
+          public Object get(String id)
+          {
+            return manager.get(id);
+          }
+
+          @Override
+          public LookupsState<LookupExtractorFactoryContainer> getAll()
+          {
+            return manager.getAllLookupsState();
+          }
+
+          @Override
+          public Object delete(String id)
+          {
+            manager.remove(id);
+            return id;
+          }
+        }
+    );
+  }
+}
diff --git a/server/src/main/java/org/apache/druid/query/lookup/LookupModule.java b/server/src/main/java/org/apache/druid/query/lookup/LookupModule.java
index 9f14b0d..c173dc1 100644
--- a/server/src/main/java/org/apache/druid/query/lookup/LookupModule.java
+++ b/server/src/main/java/org/apache/druid/query/lookup/LookupModule.java
@@ -19,24 +19,13 @@
 
 package org.apache.druid.query.lookup;
 
-import com.fasterxml.jackson.annotation.JacksonInject;
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.Module;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.jsontype.NamedType;
 import com.fasterxml.jackson.databind.module.SimpleModule;
-import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
 import com.google.inject.Binder;
-import com.google.inject.Inject;
 import com.google.inject.Provides;
-import com.sun.jersey.spi.container.ResourceFilters;
 import org.apache.curator.utils.ZKPaths;
-import org.apache.druid.common.utils.ServletResourceUtils;
-import org.apache.druid.curator.announcement.Announcer;
 import org.apache.druid.discovery.LookupNodeService;
 import org.apache.druid.guice.ExpressionModule;
 import org.apache.druid.guice.Jerseys;
@@ -44,33 +33,14 @@ import org.apache.druid.guice.JsonConfigProvider;
 import org.apache.druid.guice.LazySingleton;
 import org.apache.druid.guice.LifecycleModule;
 import org.apache.druid.guice.ManageLifecycle;
-import org.apache.druid.guice.annotations.Json;
-import org.apache.druid.guice.annotations.Self;
-import org.apache.druid.guice.annotations.Smile;
 import org.apache.druid.initialization.DruidModule;
-import org.apache.druid.java.util.common.StringUtils;
-import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.query.dimension.LookupDimensionSpec;
 import org.apache.druid.query.expression.LookupExprMacro;
-import org.apache.druid.server.DruidNode;
-import org.apache.druid.server.http.HostAndPortWithScheme;
-import org.apache.druid.server.http.security.ConfigResourceFilter;
-import org.apache.druid.server.initialization.ZkPathsConfig;
 import org.apache.druid.server.initialization.jetty.JettyBindings;
-import org.apache.druid.server.listener.announcer.ListenerResourceAnnouncer;
-import org.apache.druid.server.listener.announcer.ListeningAnnouncerConfig;
-import org.apache.druid.server.listener.resource.AbstractListenerHandler;
 import org.apache.druid.server.listener.resource.ListenerResource;
 import org.apache.druid.server.lookup.cache.LookupCoordinatorManager;
-import org.apache.druid.server.metrics.DataSourceTaskIdHolder;
 
-import javax.ws.rs.Path;
-import javax.ws.rs.core.Response;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 
 public class LookupModule implements DruidModule
 {
@@ -98,6 +68,7 @@ public class LookupModule implements DruidModule
   public void configure(Binder binder)
   {
     JsonConfigProvider.bind(binder, PROPERTY_BASE, LookupConfig.class);
+    binder.bind(LookupExtractorFactoryContainerProvider.class).to(LookupReferencesManager.class);
     LifecycleModule.register(binder, LookupReferencesManager.class);
     JsonConfigProvider.bind(binder, PROPERTY_BASE, LookupListeningAnnouncerConfig.class);
     Jerseys.addResource(binder, LookupListeningResource.class);
@@ -121,152 +92,3 @@ public class LookupModule implements DruidModule
   }
 }
 
-@Path(ListenerResource.BASE_PATH + "/" + LookupCoordinatorManager.LOOKUP_LISTEN_ANNOUNCE_KEY)
-@ResourceFilters(ConfigResourceFilter.class)
-class LookupListeningResource extends ListenerResource
-{
-  private static final Logger LOG = new Logger(LookupListeningResource.class);
-
-  private static final TypeReference<LookupsState<LookupExtractorFactoryContainer>> LOOKUPS_STATE_TYPE_REFERENCE =
-      new TypeReference<LookupsState<LookupExtractorFactoryContainer>>()
-      {
-      };
-
-  @Inject
-  public LookupListeningResource(
-      final @Json ObjectMapper jsonMapper,
-      final @Smile ObjectMapper smileMapper,
-      final LookupReferencesManager manager
-  )
-  {
-    super(
-        jsonMapper,
-        smileMapper,
-        new AbstractListenerHandler<LookupExtractorFactory>(new TypeReference<LookupExtractorFactory>()
-        {
-        })
-        {
-          @Override
-          public Response handleUpdates(InputStream inputStream, ObjectMapper mapper)
-          {
-            final LookupsState<LookupExtractorFactoryContainer> state;
-            try {
-              state = mapper.readValue(inputStream, LOOKUPS_STATE_TYPE_REFERENCE);
-            }
-            catch (final IOException ex) {
-              LOG.debug(ex, "Bad request");
-              return Response.status(Response.Status.BAD_REQUEST)
-                             .entity(ServletResourceUtils.sanitizeException(ex))
-                             .build();
-            }
-
-            try {
-              state.getToLoad().forEach(manager::add);
-              state.getToDrop().forEach(manager::remove);
-
-              return Response.status(Response.Status.ACCEPTED).entity(manager.getAllLookupsState()).build();
-            }
-            catch (Exception e) {
-              LOG.error(e, "Error handling request");
-              return Response.serverError().entity(ServletResourceUtils.sanitizeException(e)).build();
-            }
-          }
-
-          @Override
-          public Object post(final Map<String, LookupExtractorFactory> lookups)
-          {
-            final Map<String, LookupExtractorFactory> failedUpdates = new HashMap<>();
-            for (final String name : lookups.keySet()) {
-
-              final LookupExtractorFactoryContainer factoryContainer = new LookupExtractorFactoryContainer(
-                  null,
-                  lookups.get(name)
-              );
-
-              manager.add(name, factoryContainer);
-            }
-            return ImmutableMap.of("status", "accepted", LookupModule.FAILED_UPDATES_KEY, failedUpdates);
-          }
-
-          @Override
-          public Object get(String id)
-          {
-            return manager.get(id);
-          }
-
-          @Override
-          public LookupsState<LookupExtractorFactoryContainer> getAll()
-          {
-            return manager.getAllLookupsState();
-          }
-
-          @Override
-          public Object delete(String id)
-          {
-            manager.remove(id);
-            return id;
-          }
-        }
-    );
-  }
-}
-
-@Deprecated
-class LookupResourceListenerAnnouncer extends ListenerResourceAnnouncer
-{
-  @Inject
-  public LookupResourceListenerAnnouncer(
-      Announcer announcer,
-      LookupListeningAnnouncerConfig lookupListeningAnnouncerConfig,
-      @Self DruidNode node
-  )
-  {
-    super(
-        announcer,
-        lookupListeningAnnouncerConfig,
-        lookupListeningAnnouncerConfig.getLookupKey(),
-        HostAndPortWithScheme.fromString(node.getServiceScheme(), node.getHostAndPortToUse())
-    );
-  }
-}
-
-
-class LookupListeningAnnouncerConfig extends ListeningAnnouncerConfig
-{
-  public static final String DEFAULT_TIER = "__default";
-  private final DataSourceTaskIdHolder dataSourceTaskIdHolder;
-  @JsonProperty("lookupTier")
-  private String lookupTier = null;
-  @JsonProperty("lookupTierIsDatasource")
-  private boolean lookupTierIsDatasource = false;
-
-  @JsonCreator
-  public LookupListeningAnnouncerConfig(
-      @JacksonInject ZkPathsConfig zkPathsConfig,
-      @JacksonInject DataSourceTaskIdHolder dataSourceTaskIdHolder
-  )
-  {
-    super(zkPathsConfig);
-    this.dataSourceTaskIdHolder = dataSourceTaskIdHolder;
-  }
-
-  public String getLookupTier()
-  {
-    Preconditions.checkArgument(
-        !(lookupTierIsDatasource && null != lookupTier),
-        "Cannot specify both `lookupTier` and `lookupTierIsDatasource`"
-    );
-    final String lookupTier = lookupTierIsDatasource ? dataSourceTaskIdHolder.getDataSource() : this.lookupTier;
-
-    return Preconditions.checkNotNull(
-        lookupTier == null ? DEFAULT_TIER : StringUtils.emptyToNullNonDruidDataString(lookupTier),
-        "Cannot have empty lookup tier from %s",
-        lookupTierIsDatasource ? "bound value" : LookupModule.PROPERTY_BASE
-    );
-  }
-
-  public String getLookupKey()
-  {
-    return LookupModule.getTierListenerPath(getLookupTier());
-  }
-}
diff --git a/server/src/main/java/org/apache/druid/query/lookup/LookupReferencesManager.java b/server/src/main/java/org/apache/druid/query/lookup/LookupReferencesManager.java
index 0bb93eb..8bfab95 100644
--- a/server/src/main/java/org/apache/druid/query/lookup/LookupReferencesManager.java
+++ b/server/src/main/java/org/apache/druid/query/lookup/LookupReferencesManager.java
@@ -67,14 +67,17 @@ import java.util.concurrent.locks.LockSupport;
 import java.util.function.Function;
 
 /**
- * This class provide a basic {@link LookupExtractorFactory} references manager.
- * It allows basic operations fetching, listing, adding and deleting of {@link LookupExtractor} objects
- * It is be used by queries to fetch the lookup reference.
- * It is used by Lookup configuration manager to add/remove or list lookups configuration via HTTP or other protocols.
- * It does periodic snap shot of the list of lookup in order to bootstrap nodes after restart.
+ * This class provide a basic {@link LookupExtractorFactory} references manager. It allows basic operations fetching,
+ * listing, adding and deleting of {@link LookupExtractor} objects, and can take periodic snap shot of the loaded lookup
+ * extractor specifications in order to bootstrap nodes after restart.
+ *
+ * It also implements {@link LookupExtractorFactoryContainerProvider}, to supply queries and indexing transformations
+ * with a reference to a {@link LookupExtractorFactoryContainer}. This class is a companion of
+ * {@link org.apache.druid.server.lookup.cache.LookupCoordinatorManager}, which communicates with
+ * {@link LookupReferencesManager} through {@link LookupListeningResource}.
  */
 @ManageLifecycle
-public class LookupReferencesManager
+public class LookupReferencesManager implements LookupExtractorFactoryContainerProvider
 {
   private static final EmittingLogger LOG = new EmittingLogger(LookupReferencesManager.class);
 
@@ -149,11 +152,11 @@ public class LookupReferencesManager
       throw new ISE("can't start.");
     }
     try {
-      LOG.info("LookupReferencesManager is starting.");
+      LOG.info("LookupExtractorFactoryContainerProvider is starting.");
       loadAllLookupsAndInitStateRef();
       if (!testMode) {
         mainThread = Execs.makeThread(
-            "LookupReferencesManager-MainThread",
+            "LookupExtractorFactoryContainerProvider-MainThread",
             () -> {
               try {
                 if (!lifecycleLock.awaitStarted()) {
@@ -184,7 +187,7 @@ public class LookupReferencesManager
         mainThread.start();
       }
 
-      LOG.info("LookupReferencesManager is started.");
+      LOG.info("LookupExtractorFactoryContainerProvider is started.");
       lifecycleLock.started();
     }
     finally {
@@ -231,7 +234,7 @@ public class LookupReferencesManager
       throw new ISE("can't stop.");
     }
 
-    LOG.info("LookupReferencesManager is stopping.");
+    LOG.info("LookupExtractorFactoryContainerProvider is stopping.");
 
     if (!testMode) {
       mainThread.interrupt();
@@ -256,9 +259,10 @@ public class LookupReferencesManager
       }
     }
 
-    LOG.info("LookupReferencesManager is stopped.");
+    LOG.info("LookupExtractorFactoryContainerProvider is stopped.");
   }
 
+
   public void add(String lookupName, LookupExtractorFactoryContainer lookupExtractorFactoryContainer)
   {
     Preconditions.checkState(lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS));
@@ -289,6 +293,7 @@ public class LookupReferencesManager
     LockSupport.unpark(mainThread);
   }
 
+  @Override
   @Nullable
   public LookupExtractorFactoryContainer get(String lookupName)
   {
@@ -297,7 +302,7 @@ public class LookupReferencesManager
   }
 
   // Note that this should ensure that "toLoad" and "toDrop" are disjoint.
-  public LookupsState<LookupExtractorFactoryContainer> getAllLookupsState()
+  LookupsState<LookupExtractorFactoryContainer> getAllLookupsState()
   {
     Preconditions.checkState(lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS));
 
@@ -470,7 +475,7 @@ public class LookupReferencesManager
     final ImmutableMap.Builder<String, LookupExtractorFactoryContainer> builder = ImmutableMap.builder();
     final ExecutorService executorService = Execs.multiThreaded(
         lookupConfig.getNumLookupLoadingThreads(),
-        "LookupReferencesManager-Startup-%s"
+        "LookupExtractorFactoryContainerProvider-Startup-%s"
     );
     final CompletionService<Map.Entry<String, LookupExtractorFactoryContainer>> completionService =
         new ExecutorCompletionService<>(executorService);
diff --git a/server/src/main/java/org/apache/druid/query/lookup/LookupResourceListenerAnnouncer.java b/server/src/main/java/org/apache/druid/query/lookup/LookupResourceListenerAnnouncer.java
new file mode 100644
index 0000000..d58e2b6
--- /dev/null
+++ b/server/src/main/java/org/apache/druid/query/lookup/LookupResourceListenerAnnouncer.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.lookup;
+
+import com.google.inject.Inject;
+import org.apache.druid.curator.announcement.Announcer;
+import org.apache.druid.guice.annotations.Self;
+import org.apache.druid.server.DruidNode;
+import org.apache.druid.server.http.HostAndPortWithScheme;
+import org.apache.druid.server.listener.announcer.ListenerResourceAnnouncer;
+
+@Deprecated
+class LookupResourceListenerAnnouncer extends ListenerResourceAnnouncer
+{
+  @Inject
+  public LookupResourceListenerAnnouncer(
+      Announcer announcer,
+      LookupListeningAnnouncerConfig lookupListeningAnnouncerConfig,
+      @Self DruidNode node
+  )
+  {
+    super(
+        announcer,
+        lookupListeningAnnouncerConfig,
+        lookupListeningAnnouncerConfig.getLookupKey(),
+        HostAndPortWithScheme.fromString(node.getServiceScheme(), node.getHostAndPortToUse())
+    );
+  }
+}
diff --git a/server/src/main/java/org/apache/druid/query/lookup/LookupSerdeModule.java b/server/src/main/java/org/apache/druid/query/lookup/LookupSerdeModule.java
new file mode 100644
index 0000000..3a71377
--- /dev/null
+++ b/server/src/main/java/org/apache/druid/query/lookup/LookupSerdeModule.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.lookup;
+
+import com.fasterxml.jackson.databind.Module;
+import com.fasterxml.jackson.databind.jsontype.NamedType;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.google.common.collect.ImmutableList;
+import com.google.inject.Binder;
+import org.apache.druid.guice.JsonConfigProvider;
+import org.apache.druid.initialization.DruidModule;
+import org.apache.druid.query.dimension.LookupDimensionSpec;
+
+import javax.annotation.Nullable;
+import java.util.List;
+
+/**
+ * Variant of {@link LookupModule} that only supports serde of {@link org.apache.druid.query.Query} objects, to allow
+ * a service to examine queries that might contain for example a {@link RegisteredLookupExtractionFn}, but without
+ * requiring the service to load the actual lookups.
+ */
+public class LookupSerdeModule implements DruidModule
+{
+  @Override
+  public List<? extends Module> getJacksonModules()
+  {
+    return ImmutableList.<Module>of(
+        new SimpleModule("DruidLookupModule").registerSubtypes(MapLookupExtractorFactory.class),
+        new SimpleModule().registerSubtypes(
+            new NamedType(LookupDimensionSpec.class, "lookup"),
+            new NamedType(RegisteredLookupExtractionFn.class, "registeredLookup")
+        )
+    );
+  }
+
+  @Override
+  public void configure(Binder binder)
+  {
+    JsonConfigProvider.bind(binder, LookupModule.PROPERTY_BASE, LookupConfig.class);
+    binder.bind(LookupExtractorFactoryContainerProvider.class).to(NoopLookupExtractorFactoryContainerProvider.class);
+  }
+
+  /**
+   * Anything using this module doesn't actually need lookups, but the objects that get materialized during
+   * deserialization expect a {@link LookupExtractorFactoryContainerProvider} to exist, so this one returns nulls.
+   */
+  private static class NoopLookupExtractorFactoryContainerProvider implements LookupExtractorFactoryContainerProvider
+  {
+    @Nullable
+    @Override
+    public LookupExtractorFactoryContainer get(String lookupName)
+    {
+      return null;
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/druid/server/lookup/cache/LookupCoordinatorManager.java b/server/src/main/java/org/apache/druid/server/lookup/cache/LookupCoordinatorManager.java
index 0f90c83..86acfa3 100644
--- a/server/src/main/java/org/apache/druid/server/lookup/cache/LookupCoordinatorManager.java
+++ b/server/src/main/java/org/apache/druid/server/lookup/cache/LookupCoordinatorManager.java
@@ -85,7 +85,10 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
- *
+ * Managers {@link org.apache.druid.query.lookup.LookupExtractorFactoryContainer} specifications, distributing them
+ * to {@link org.apache.druid.query.lookup.LookupReferencesManager} around the cluster by monitoring the lookup
+ * announce path for servers and utilizing their {@link org.apache.druid.query.lookup.LookupListeningResource} API
+ * to load, drop, and update lookups around the cluster.
  */
 public class LookupCoordinatorManager
 {
diff --git a/server/src/test/java/org/apache/druid/query/dimension/LookupDimensionSpecTest.java b/server/src/test/java/org/apache/druid/query/dimension/LookupDimensionSpecTest.java
index f5dd8d0..fb5d1b4 100644
--- a/server/src/test/java/org/apache/druid/query/dimension/LookupDimensionSpecTest.java
+++ b/server/src/test/java/org/apache/druid/query/dimension/LookupDimensionSpecTest.java
@@ -31,7 +31,7 @@ import org.apache.druid.query.extraction.ExtractionFn;
 import org.apache.druid.query.extraction.MapLookupExtractor;
 import org.apache.druid.query.lookup.LookupExtractor;
 import org.apache.druid.query.lookup.LookupExtractorFactoryContainer;
-import org.apache.druid.query.lookup.LookupReferencesManager;
+import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider;
 import org.apache.druid.query.lookup.MapLookupExtractorFactory;
 import org.apache.druid.segment.TestHelper;
 import org.easymock.EasyMock;
@@ -49,7 +49,8 @@ public class LookupDimensionSpecTest
   private static final Map<String, String> STRING_MAP = ImmutableMap.of("key", "value", "key2", "value2");
   private static LookupExtractor MAP_LOOKUP_EXTRACTOR = new MapLookupExtractor(STRING_MAP, true);
 
-  private static final LookupReferencesManager LOOKUP_REF_MANAGER = EasyMock.createMock(LookupReferencesManager.class);
+  private static final LookupExtractorFactoryContainerProvider LOOKUP_REF_MANAGER =
+      EasyMock.createMock(LookupExtractorFactoryContainerProvider.class);
 
   static {
     EasyMock
@@ -60,7 +61,7 @@ public class LookupDimensionSpecTest
   }
 
   private final DimensionSpec lookupDimSpec =
-      new LookupDimensionSpec("dimName", "outputName", MAP_LOOKUP_EXTRACTOR, false, null, null, null, true);
+      new LookupDimensionSpec("dimName", "outputName", MAP_LOOKUP_EXTRACTOR, false, null, null, true, null);
 
 
   @Parameters
@@ -70,7 +71,7 @@ public class LookupDimensionSpecTest
     ObjectMapper mapper = new DefaultObjectMapper();
     mapper.registerSubtypes(new NamedType(LookupDimensionSpec.class, "lookup"));
     InjectableValues injectableValues = new InjectableValues.Std().addValue(
-        LookupReferencesManager.class,
+        LookupExtractorFactoryContainerProvider.class,
         LOOKUP_REF_MANAGER
     );
     String serLookup = mapper.writeValueAsString(lookupDimSpec);
@@ -80,23 +81,23 @@ public class LookupDimensionSpecTest
   private Object[] parametersForTestSerDesr()
   {
     return new Object[]{
-        new LookupDimensionSpec("dimName", "outputName", MAP_LOOKUP_EXTRACTOR, true, null, null, null, true),
-        new LookupDimensionSpec("dimName", "outputName", MAP_LOOKUP_EXTRACTOR, false, "Missing_value", null, null, true),
-        new LookupDimensionSpec("dimName", "outputName", MAP_LOOKUP_EXTRACTOR, false, null, null, null, true),
-        new LookupDimensionSpec("dimName", "outputName", null, false, null, "name", LOOKUP_REF_MANAGER, true)
+        new LookupDimensionSpec("dimName", "outputName", MAP_LOOKUP_EXTRACTOR, true, null, null, true, null),
+        new LookupDimensionSpec("dimName", "outputName", MAP_LOOKUP_EXTRACTOR, false, "Missing_value", null, true, null),
+        new LookupDimensionSpec("dimName", "outputName", MAP_LOOKUP_EXTRACTOR, false, null, null, true, null),
+        new LookupDimensionSpec("dimName", "outputName", null, false, null, "name", true, LOOKUP_REF_MANAGER)
     };
   }
 
   @Test(expected = Exception.class)
   public void testExceptionWhenNameAndLookupNotNull()
   {
-    new LookupDimensionSpec("dimName", "outputName", MAP_LOOKUP_EXTRACTOR, false, "replace", "name", null, true);
+    new LookupDimensionSpec("dimName", "outputName", MAP_LOOKUP_EXTRACTOR, false, "replace", "name", true, null);
   }
 
   @Test(expected = Exception.class)
   public void testExceptionWhenNameAndLookupNull()
   {
-    new LookupDimensionSpec("dimName", "outputName", null, false, "replace", "", null, true);
+    new LookupDimensionSpec("dimName", "outputName", null, false, "replace", "", true, null);
   }
 
   @Test
@@ -115,39 +116,41 @@ public class LookupDimensionSpecTest
   {
     return new Object[]{
         new Object[]{
-            new LookupDimensionSpec("dimName", "outputName", null, true, null, "lookupName", LOOKUP_REF_MANAGER, true),
+            new LookupDimensionSpec("dimName", "outputName", null, true, null, "lookupName", true, LOOKUP_REF_MANAGER),
             STRING_MAP
         },
         new Object[]{
-            new LookupDimensionSpec("dimName", "outputName", MAP_LOOKUP_EXTRACTOR, true, null, null, null, true),
+            new LookupDimensionSpec("dimName", "outputName", MAP_LOOKUP_EXTRACTOR, true, null, null, true, null),
             STRING_MAP
         },
         new Object[]{
-            new LookupDimensionSpec("dimName", "outputName", MAP_LOOKUP_EXTRACTOR, false, null, null, null, true),
+            new LookupDimensionSpec("dimName", "outputName", MAP_LOOKUP_EXTRACTOR, false, null, null, true, null),
             TestHelper.createExpectedMap("not there", null)
         },
         new Object[]{
-            new LookupDimensionSpec("dimName", "outputName", null, false, null, "lookupName", LOOKUP_REF_MANAGER, true),
+            new LookupDimensionSpec("dimName", "outputName", null, false, null, "lookupName", true, LOOKUP_REF_MANAGER),
             TestHelper.createExpectedMap("not there", null)
         },
         new Object[]{
-            new LookupDimensionSpec("dimName", "outputName", MAP_LOOKUP_EXTRACTOR, false, "Missing_value", null, null,
-                                    true
+            new LookupDimensionSpec("dimName", "outputName", MAP_LOOKUP_EXTRACTOR, false, "Missing_value", null,
+                                    true,
+                                    null
             ),
             ImmutableMap.of("not there", "Missing_value")
         },
         new Object[]{
-            new LookupDimensionSpec("dimName", "outputName", null, false, "Missing_value", "lookupName", LOOKUP_REF_MANAGER,
-                                    true
+            new LookupDimensionSpec("dimName", "outputName", null, false, "Missing_value", "lookupName",
+                                    true,
+                                    LOOKUP_REF_MANAGER
             ),
             ImmutableMap.of("not there", "Missing_value")
         },
         new Object[]{
-            new LookupDimensionSpec("dimName", "outputName", null, true, null, "lookupName", LOOKUP_REF_MANAGER, true),
+            new LookupDimensionSpec("dimName", "outputName", null, true, null, "lookupName", true, LOOKUP_REF_MANAGER),
             ImmutableMap.of("not there", "not there")
         },
         new Object[]{
-            new LookupDimensionSpec("dimName", "outputName", MAP_LOOKUP_EXTRACTOR, true, null, "", null, true),
+            new LookupDimensionSpec("dimName", "outputName", MAP_LOOKUP_EXTRACTOR, true, null, "", true, null),
             ImmutableMap.of("not there", "not there")
         }
 
@@ -170,29 +173,30 @@ public class LookupDimensionSpecTest
   {
     return new Object[]{
         new Object[]{
-            new LookupDimensionSpec("dimName", "outputName", MAP_LOOKUP_EXTRACTOR, true, null, null, null, true),
+            new LookupDimensionSpec("dimName", "outputName", MAP_LOOKUP_EXTRACTOR, true, null, null, true, null),
             false
         },
         new Object[]{
-            new LookupDimensionSpec("dimName", "outputName", MAP_LOOKUP_EXTRACTOR, false, "Missing_value", null, null,
-                                    true
+            new LookupDimensionSpec("dimName", "outputName", MAP_LOOKUP_EXTRACTOR, false, "Missing_value", null,
+                                    true,
+                                    null
             ),
             false
         },
         new Object[]{
-            new LookupDimensionSpec("dimName", "outputName2", MAP_LOOKUP_EXTRACTOR, false, null, null, null, true),
+            new LookupDimensionSpec("dimName", "outputName2", MAP_LOOKUP_EXTRACTOR, false, null, null, true, null),
             false
         },
         new Object[]{
-            new LookupDimensionSpec("dimName2", "outputName2", MAP_LOOKUP_EXTRACTOR, false, null, null, null, true),
+            new LookupDimensionSpec("dimName2", "outputName2", MAP_LOOKUP_EXTRACTOR, false, null, null, true, null),
             false
         },
         new Object[]{
-            new LookupDimensionSpec("dimName", "outputName", MAP_LOOKUP_EXTRACTOR, false, null, null, null, true),
+            new LookupDimensionSpec("dimName", "outputName", MAP_LOOKUP_EXTRACTOR, false, null, null, true, null),
             true
         },
         new Object[]{
-            new LookupDimensionSpec("dimName", "outputName", null, false, null, "name", LOOKUP_REF_MANAGER, true),
+            new LookupDimensionSpec("dimName", "outputName", null, false, null, "name", true, LOOKUP_REF_MANAGER),
             false
         }
     };
diff --git a/server/src/test/java/org/apache/druid/query/expression/LookupEnabledTestExprMacroTable.java b/server/src/test/java/org/apache/druid/query/expression/LookupEnabledTestExprMacroTable.java
index 58caacf..6c36b82 100644
--- a/server/src/test/java/org/apache/druid/query/expression/LookupEnabledTestExprMacroTable.java
+++ b/server/src/test/java/org/apache/druid/query/expression/LookupEnabledTestExprMacroTable.java
@@ -27,8 +27,8 @@ import org.apache.druid.query.extraction.MapLookupExtractor;
 import org.apache.druid.query.lookup.LookupExtractor;
 import org.apache.druid.query.lookup.LookupExtractorFactory;
 import org.apache.druid.query.lookup.LookupExtractorFactoryContainer;
+import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider;
 import org.apache.druid.query.lookup.LookupIntrospectHandler;
-import org.apache.druid.query.lookup.LookupReferencesManager;
 import org.easymock.EasyMock;
 
 import javax.annotation.Nullable;
@@ -36,7 +36,7 @@ import java.util.Collections;
 
 /**
  * Separate from {@link TestExprMacroTable} since that one is in druid-processing, which doesn't have
- * {@link LookupReferencesManager}.
+ * {@link LookupExtractorFactoryContainerProvider}.
  */
 public class LookupEnabledTestExprMacroTable extends ExprMacroTable
 {
@@ -57,12 +57,15 @@ public class LookupEnabledTestExprMacroTable extends ExprMacroTable
   }
 
   /**
-   * Returns a mock {@link LookupReferencesManager} that has one lookup, "lookyloo".
+   * Returns a mock {@link LookupExtractorFactoryContainerProvider} that has one lookup, "lookyloo".
    */
-  public static LookupReferencesManager createTestLookupReferencesManager(final ImmutableMap<String, String> theLookup)
+  public static LookupExtractorFactoryContainerProvider createTestLookupReferencesManager(
+      final ImmutableMap<String, String> theLookup
+  )
   {
-    final LookupReferencesManager lookupReferencesManager = EasyMock.createMock(LookupReferencesManager.class);
-    EasyMock.expect(lookupReferencesManager.get(EasyMock.eq("lookyloo"))).andReturn(
+    final LookupExtractorFactoryContainerProvider lookupExtractorFactoryContainerProvider =
+        EasyMock.createMock(LookupExtractorFactoryContainerProvider.class);
+    EasyMock.expect(lookupExtractorFactoryContainerProvider.get(EasyMock.eq("lookyloo"))).andReturn(
         new LookupExtractorFactoryContainer(
             "v0",
             new LookupExtractorFactory()
@@ -99,8 +102,8 @@ public class LookupEnabledTestExprMacroTable extends ExprMacroTable
             }
         )
     ).anyTimes();
-    EasyMock.expect(lookupReferencesManager.get(EasyMock.not(EasyMock.eq("lookyloo")))).andReturn(null).anyTimes();
-    EasyMock.replay(lookupReferencesManager);
-    return lookupReferencesManager;
+    EasyMock.expect(lookupExtractorFactoryContainerProvider.get(EasyMock.not(EasyMock.eq("lookyloo")))).andReturn(null).anyTimes();
+    EasyMock.replay(lookupExtractorFactoryContainerProvider);
+    return lookupExtractorFactoryContainerProvider;
   }
 }
diff --git a/server/src/test/java/org/apache/druid/query/lookup/LookupIntrospectionResourceImplTest.java b/server/src/test/java/org/apache/druid/query/lookup/LookupIntrospectionResourceImplTest.java
deleted file mode 100644
index 3236fff..0000000
--- a/server/src/test/java/org/apache/druid/query/lookup/LookupIntrospectionResourceImplTest.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.query.lookup;
-
-import com.google.common.collect.ImmutableMap;
-import com.sun.jersey.api.client.WebResource;
-import com.sun.jersey.api.core.ClassNamesResourceConfig;
-import com.sun.jersey.spi.container.servlet.WebComponent;
-import com.sun.jersey.spi.inject.SingletonTypeInjectableProvider;
-import com.sun.jersey.test.framework.JerseyTest;
-import com.sun.jersey.test.framework.WebAppDescriptor;
-import com.sun.jersey.test.framework.spi.container.TestContainerFactory;
-import com.sun.jersey.test.framework.spi.container.grizzly2.GrizzlyTestContainerFactory;
-import org.easymock.EasyMock;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import javax.ws.rs.core.Context;
-import javax.ws.rs.ext.Provider;
-
-public class LookupIntrospectionResourceImplTest extends JerseyTest
-{
-
-  static LookupReferencesManager lookupReferencesManager = EasyMock.createMock(LookupReferencesManager.class);
-
-
-  @Override
-  @Before
-  public void setUp() throws Exception
-  {
-    super.setUp();
-    EasyMock.reset(lookupReferencesManager);
-    LookupExtractorFactory lookupExtractorFactory1 = new MapLookupExtractorFactory(ImmutableMap.of(
-        "key",
-        "value",
-        "key2",
-        "value2"
-    ), false);
-    EasyMock.expect(lookupReferencesManager.get("lookupId1")).andReturn(
-        new LookupExtractorFactoryContainer(
-            "v0",
-            lookupExtractorFactory1
-        )
-    ).anyTimes();
-    EasyMock.replay(lookupReferencesManager);
-  }
-
-  @Provider
-  public static class MockTodoServiceProvider extends
-      SingletonTypeInjectableProvider<Context, LookupReferencesManager>
-  {
-    public MockTodoServiceProvider()
-    {
-      super(LookupReferencesManager.class, lookupReferencesManager);
-    }
-  }
-
-
-  public LookupIntrospectionResourceImplTest()
-  {
-    super(new WebAppDescriptor.Builder().initParam(
-                                            WebComponent.RESOURCE_CONFIG_CLASS,
-                                            ClassNamesResourceConfig.class.getName()
-                                        )
-                                        .initParam(
-                                            ClassNamesResourceConfig.PROPERTY_CLASSNAMES,
-                                            LookupIntrospectionResource.class.getName()
-                                            + ';'
-                                            + MockTodoServiceProvider.class.getName()
-                                            + ';'
-                                            + LookupIntrospectHandler.class.getName()
-                                        )
-                                        .build());
-  }
-
-  @Override
-  protected TestContainerFactory getTestContainerFactory()
-  {
-    return new GrizzlyTestContainerFactory();
-  }
-
-
-  @Test
-  public void testGetKey()
-  {
-
-    WebResource r = resource().path("/druid/v1/lookups/introspect/lookupId1/keys");
-    String s = r.get(String.class);
-    Assert.assertEquals("[key, key2]", s);
-  }
-
-  @Test
-  public void testGetValue()
-  {
-    WebResource r = resource().path("/druid/v1/lookups/introspect/lookupId1/values");
-    String s = r.get(String.class);
-    Assert.assertEquals("[value, value2]", s);
-  }
-
-  @Test
-  public void testGetMap()
-  {
-    WebResource r = resource().path("/druid/v1/lookups/introspect/lookupId1/");
-    String s = r.get(String.class);
-    Assert.assertEquals("{\"key\":\"value\",\"key2\":\"value2\"}", s);
-  }
-}
diff --git a/server/src/test/java/org/apache/druid/query/lookup/LookupIntrospectionResourceTest.java b/server/src/test/java/org/apache/druid/query/lookup/LookupIntrospectionResourceTest.java
index d605f36..37cab51 100644
--- a/server/src/test/java/org/apache/druid/query/lookup/LookupIntrospectionResourceTest.java
+++ b/server/src/test/java/org/apache/druid/query/lookup/LookupIntrospectionResourceTest.java
@@ -20,120 +20,159 @@
 package org.apache.druid.query.lookup;
 
 import com.google.common.collect.ImmutableMap;
+import com.sun.jersey.api.client.Client;
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.api.client.WebResource;
+import com.sun.jersey.api.client.config.DefaultClientConfig;
 import org.apache.druid.query.extraction.MapLookupExtractor;
+import org.apache.druid.server.WebserverTestUtils;
 import org.easymock.EasyMock;
+import org.glassfish.grizzly.http.server.HttpServer;
+import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Test;
 
-import javax.annotation.Nullable;
-import javax.ws.rs.POST;
+import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
-import java.io.InputStream;
+import java.net.URI;
 
 public class LookupIntrospectionResourceTest
 {
+  private static LookupExtractorFactory mockLookupExtractorFactory = EasyMock.createMock(LookupExtractorFactory.class);
 
-  LookupReferencesManager lookupReferencesManager = EasyMock.createMock(LookupReferencesManager.class);
-  LookupExtractorFactory lookupExtractorFactory = EasyMock.createMock(LookupExtractorFactory.class);
-  LookupIntrospectHandler lookupIntrospectHandler = EasyMock.createMock(LookupIntrospectHandler.class);
+  private static LookupExtractorFactoryContainerProvider mockLookupExtractorFactoryContainerProvider =
+      EasyMock.createMock(LookupExtractorFactoryContainerProvider.class);
 
-  LookupIntrospectionResource lookupIntrospectionResource = new LookupIntrospectionResource(lookupReferencesManager);
+  private static LookupIntrospectHandler mockLookupIntrospectHandler =
+      EasyMock.createMock(LookupIntrospectHandler.class);
+
+  private LookupIntrospectionResource lookupIntrospectionResource =
+      new LookupIntrospectionResource(mockLookupExtractorFactoryContainerProvider);
+
+  private URI baseUri;
+  private HttpServer server;
 
   @Before
-  public void setUp()
+  public void setup() throws Exception
   {
-    EasyMock.expect(lookupReferencesManager.get("lookupId")).andReturn(
+    LookupExtractorFactory actualLookupExtractorFactory = new MapLookupExtractorFactory(
+        ImmutableMap.of("key", "value", "key2", "value2"),
+        false
+    );
+
+    EasyMock.reset(mockLookupExtractorFactoryContainerProvider);
+    EasyMock.reset(mockLookupExtractorFactory);
+    EasyMock.reset(mockLookupIntrospectHandler);
+    EasyMock.expect(mockLookupExtractorFactoryContainerProvider.get("lookupId")).andReturn(
+        new LookupExtractorFactoryContainer(
+            "v0",
+            mockLookupExtractorFactory
+        )
+    ).anyTimes();
+    EasyMock.expect(mockLookupExtractorFactoryContainerProvider.get("lookupId1")).andReturn(
         new LookupExtractorFactoryContainer(
             "v0",
-            lookupExtractorFactory
+            actualLookupExtractorFactory
         )
     ).anyTimes();
-    EasyMock.expect(lookupReferencesManager.get(EasyMock.anyString())).andReturn(null).anyTimes();
-    EasyMock.replay(lookupReferencesManager);
+
+    EasyMock.expect(mockLookupExtractorFactoryContainerProvider.get(EasyMock.anyString())).andReturn(null).anyTimes();
+    EasyMock.replay(mockLookupExtractorFactoryContainerProvider);
+
+    baseUri = WebserverTestUtils.createBaseUri();
+    server = WebserverTestUtils.createServer(
+        "lookup-test",
+        baseUri,
+        LookupIntrospectionResource.class.getName(),
+        binder -> {
+          binder.bind(LookupExtractorFactoryContainerProvider.class)
+                .toInstance(mockLookupExtractorFactoryContainerProvider);
+        }
+    );
+    server.start();
+  }
+
+  @After
+  public void teardown()
+  {
+    if (server != null) {
+      server.stop();
+    }
   }
+
   @Test
   public void testNotImplementedIntrospectLookup()
   {
-    EasyMock.expect(lookupExtractorFactory.getIntrospectHandler()).andReturn(null);
-    EasyMock.expect(lookupExtractorFactory.get()).andReturn(new MapLookupExtractor(ImmutableMap.of(), false)).anyTimes();
-    EasyMock.replay(lookupExtractorFactory);
-    Assert.assertEquals(Response.status(Response.Status.NOT_FOUND).build().getStatus(), ((Response) lookupIntrospectionResource.introspectLookup("lookupId")).getStatus());
+    EasyMock.expect(mockLookupExtractorFactory.getIntrospectHandler()).andReturn(null);
+    EasyMock.expect(mockLookupExtractorFactory.get())
+            .andReturn(new MapLookupExtractor(ImmutableMap.of(), false))
+            .anyTimes();
+    EasyMock.replay(mockLookupExtractorFactory);
+    Assert.assertEquals(
+        Response.status(Response.Status.NOT_FOUND).build().getStatus(),
+        ((Response) lookupIntrospectionResource.introspectLookup("lookupId")).getStatus()
+    );
   }
 
-
   @Test
   public void testNotExistingLookup()
   {
-    Assert.assertEquals(Response.status(Response.Status.NOT_FOUND).build().getStatus(), ((Response) lookupIntrospectionResource.introspectLookup("not there")).getStatus());
+    Assert.assertEquals(
+        Response.status(Response.Status.NOT_FOUND).build().getStatus(),
+        ((Response) lookupIntrospectionResource.introspectLookup("not there")).getStatus()
+    );
   }
 
   @Test public void testExistingLookup()
   {
-    EasyMock.expect(lookupExtractorFactory.getIntrospectHandler()).andReturn(lookupIntrospectHandler);
-    EasyMock.expect(lookupExtractorFactory.get()).andReturn(new MapLookupExtractor(ImmutableMap.of(), false)).anyTimes();
-    EasyMock.replay(lookupExtractorFactory);
-    Assert.assertEquals(lookupIntrospectHandler, lookupIntrospectionResource.introspectLookup("lookupId"));
+    EasyMock.expect(mockLookupExtractorFactory.getIntrospectHandler()).andReturn(mockLookupIntrospectHandler);
+    EasyMock.expect(mockLookupExtractorFactory.get())
+            .andReturn(new MapLookupExtractor(ImmutableMap.of(), false))
+            .anyTimes();
+    EasyMock.replay(mockLookupExtractorFactory);
+    Assert.assertEquals(mockLookupIntrospectHandler, lookupIntrospectionResource.introspectLookup("lookupId"));
   }
+
   @Test
-  @Ignore
-  public void testIntrospection()
+  public void testGetKey()
   {
+    Client client = Client.create(new DefaultClientConfig());
+    WebResource service = client.resource(baseUri);
+
+    ClientResponse resp = service.path("/druid/v1/lookups/introspect/lookupId1/keys")
+                                 .accept(MediaType.APPLICATION_JSON)
+                                 .get(ClientResponse.class);
+    String s = resp.getEntity(String.class);
+    Assert.assertEquals("[key, key2]", s);
+    Assert.assertEquals(200, resp.getStatus());
+  }
 
-    LookupIntrospectHandler lookupIntrospectHandler = new LookupIntrospectHandler()
-    {
-      @POST
-      public Response postMock(InputStream inputStream)
-      {
-        return Response.ok().build();
-      }
-    };
-
-    LookupExtractorFactory lookupExtractorFactory1 = new LookupExtractorFactory()
-    {
-      final LookupExtractor mapLookup = new MapLookupExtractor(ImmutableMap.of("key", "value"), true);
-
-      @Override
-      public boolean start()
-      {
-        return true;
-      }
-
-      @Override
-      public boolean close()
-      {
-        return true;
-      }
-
-      @Override
-      public boolean replaces(@Nullable LookupExtractorFactory other)
-      {
-        return true;
-      }
-
-      @Nullable
-      @Override
-      public LookupIntrospectHandler getIntrospectHandler()
-      {
-        return null;
-      }
-
-      @Override
-      public LookupExtractor get()
-      {
-        return mapLookup;
-      }
-    };
-
-    LookupIntrospectionResource lookupIntrospectionResource = new LookupIntrospectionResource(lookupReferencesManager);
-    EasyMock.expect(lookupReferencesManager.get("lookupId1")).andReturn(
-        new LookupExtractorFactoryContainer(
-            "v0",
-            lookupExtractorFactory1
-        )
-    ).anyTimes();
-    EasyMock.replay(lookupReferencesManager);
+  @Test
+  public void testGetValue()
+  {
+    Client client = Client.create(new DefaultClientConfig());
+    WebResource service = client.resource(baseUri);
+
+    ClientResponse resp = service.path("/druid/v1/lookups/introspect/lookupId1/values")
+                                 .accept(MediaType.APPLICATION_JSON)
+                                 .get(ClientResponse.class);
+    String s = resp.getEntity(String.class);
+    Assert.assertEquals("[value, value2]", s);
+    Assert.assertEquals(200, resp.getStatus());
+  }
 
+  @Test
+  public void testGetMap()
+  {
+    Client client = Client.create(new DefaultClientConfig());
+    WebResource service = client.resource(baseUri);
+
+    ClientResponse resp = service.path("/druid/v1/lookups/introspect/lookupId1/")
+                                 .accept(MediaType.APPLICATION_JSON)
+                                 .get(ClientResponse.class);
+    String s = resp.getEntity(String.class);
+    Assert.assertEquals("{\"key\":\"value\",\"key2\":\"value2\"}", s);
+    Assert.assertEquals(200, resp.getStatus());
   }
 }
diff --git a/server/src/test/java/org/apache/druid/query/lookup/RegisteredLookupExtractionFnTest.java b/server/src/test/java/org/apache/druid/query/lookup/RegisteredLookupExtractionFnTest.java
index cce4cc3..147f79c 100644
--- a/server/src/test/java/org/apache/druid/query/lookup/RegisteredLookupExtractionFnTest.java
+++ b/server/src/test/java/org/apache/druid/query/lookup/RegisteredLookupExtractionFnTest.java
@@ -50,7 +50,7 @@ public class RegisteredLookupExtractionFnTest
   @Test
   public void testSimpleDelegation()
   {
-    final LookupReferencesManager manager = EasyMock.createStrictMock(LookupReferencesManager.class);
+    final LookupExtractorFactoryContainerProvider manager = EasyMock.createStrictMock(LookupReferencesManager.class);
     managerReturnsMap(manager);
     EasyMock.replay(manager);
     final RegisteredLookupExtractionFn fn = new RegisteredLookupExtractionFn(
@@ -75,7 +75,7 @@ public class RegisteredLookupExtractionFnTest
   @Test
   public void testInheritInjective()
   {
-    final LookupReferencesManager manager = EasyMock.createStrictMock(LookupReferencesManager.class);
+    final LookupExtractorFactoryContainerProvider manager = EasyMock.createStrictMock(LookupReferencesManager.class);
     managerReturnsMap(manager);
     EasyMock.replay(manager);
     final RegisteredLookupExtractionFn fn = new RegisteredLookupExtractionFn(
@@ -95,7 +95,7 @@ public class RegisteredLookupExtractionFnTest
   @Test
   public void testMissingDelegation()
   {
-    final LookupReferencesManager manager = EasyMock.createStrictMock(LookupReferencesManager.class);
+    final LookupExtractorFactoryContainerProvider manager = EasyMock.createStrictMock(LookupReferencesManager.class);
     EasyMock.expect(manager.get(EasyMock.eq(LOOKUP_NAME))).andReturn(null).once();
     EasyMock.replay(manager);
 
@@ -134,7 +134,7 @@ public class RegisteredLookupExtractionFnTest
   {
     final ObjectMapper mapper = new DefaultObjectMapper();
 
-    final LookupReferencesManager manager = EasyMock.createStrictMock(LookupReferencesManager.class);
+    final LookupExtractorFactoryContainerProvider manager = EasyMock.createStrictMock(LookupReferencesManager.class);
     managerReturnsMap(manager);
     EasyMock.replay(manager);
     final RegisteredLookupExtractionFn fn = new RegisteredLookupExtractionFn(
@@ -159,7 +159,7 @@ public class RegisteredLookupExtractionFnTest
   @Test
   public void testEquals()
   {
-    final LookupReferencesManager manager = EasyMock.createStrictMock(LookupReferencesManager.class);
+    final LookupExtractorFactoryContainerProvider manager = EasyMock.createStrictMock(LookupReferencesManager.class);
     managerReturnsMap(manager);
     EasyMock.replay(manager);
     final RegisteredLookupExtractionFn fn = new RegisteredLookupExtractionFn(
@@ -245,7 +245,7 @@ public class RegisteredLookupExtractionFnTest
     EasyMock.verify(manager);
   }
 
-  private void managerReturnsMap(LookupReferencesManager manager)
+  private void managerReturnsMap(LookupExtractorFactoryContainerProvider manager)
   {
     EasyMock.expect(manager.get(EasyMock.eq(LOOKUP_NAME))).andReturn(
         new LookupExtractorFactoryContainer(
diff --git a/server/src/test/java/org/apache/druid/server/WebserverTestUtils.java b/server/src/test/java/org/apache/druid/server/WebserverTestUtils.java
new file mode 100644
index 0000000..2450efa
--- /dev/null
+++ b/server/src/test/java/org/apache/druid/server/WebserverTestUtils.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server;
+
+import com.google.common.collect.ImmutableList;
+import com.google.inject.Binder;
+import com.google.inject.Injector;
+import com.google.inject.Key;
+import com.google.inject.name.Names;
+import com.sun.jersey.api.container.grizzly2.GrizzlyServerFactory;
+import com.sun.jersey.api.core.ClassNamesResourceConfig;
+import com.sun.jersey.api.core.ResourceConfig;
+import com.sun.jersey.core.spi.component.ioc.IoCComponentProviderFactory;
+import com.sun.jersey.guice.spi.container.GuiceComponentProviderFactory;
+import com.sun.jersey.spi.inject.SingletonTypeInjectableProvider;
+import org.apache.druid.guice.GuiceInjectors;
+import org.apache.druid.guice.annotations.Client;
+import org.apache.druid.initialization.Initialization;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.http.client.HttpClient;
+import org.apache.druid.server.metrics.NoopServiceEmitter;
+import org.apache.druid.server.security.AuthConfig;
+import org.apache.druid.server.security.AuthTestUtils;
+import org.apache.druid.server.security.AuthenticationResult;
+import org.apache.druid.server.security.AuthenticatorMapper;
+import org.apache.druid.server.security.AuthorizerMapper;
+import org.easymock.EasyMock;
+import org.glassfish.grizzly.http.server.HttpServer;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.UriBuilder;
+import javax.ws.rs.ext.Provider;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collections;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.Consumer;
+
+public class WebserverTestUtils
+{
+
+  public static URI createBaseUri()
+  {
+    final int port = ThreadLocalRandom.current().nextInt(1024, 65534);
+    return UriBuilder.fromUri("http://localhost/").port(port).build();
+  }
+
+  public static HttpServer createServer(
+      String SERVICE_NAME,
+      URI baseUri,
+      String resourceClassName,
+      Consumer<Binder> extender
+  )
+      throws IOException
+  {
+    Injector injector = Initialization.makeInjectorWithModules(
+        GuiceInjectors.makeStartupInjector(),
+        ImmutableList.of(binder -> {
+          binder.bindConstant().annotatedWith(Names.named("serviceName")).to(SERVICE_NAME);
+          binder.bindConstant().annotatedWith(Names.named("servicePort")).to(baseUri.getPort());
+          binder.bindConstant().annotatedWith(Names.named("tlsServicePort")).to(baseUri.getPort() + 1);
+          binder.bind(Key.get(ServiceEmitter.class)).toInstance(new NoopServiceEmitter());
+          binder.bind(Key.get(AuthConfig.class)).toInstance(new AuthConfig());
+          binder.bind(AuthorizerMapper.class).toInstance(AuthTestUtils.TEST_AUTHORIZER_MAPPER);
+          binder.bind(AuthenticatorMapper.class).toInstance(AuthTestUtils.TEST_AUTHENTICATOR_MAPPER);
+          binder.bind(Key.get(HttpClient.class, Client.class)).toInstance(EasyMock.createMock(HttpClient.class));
+          extender.accept(binder);
+        })
+    );
+    ResourceConfig resourceConfig = new ClassNamesResourceConfig(
+        resourceClassName
+        + ';'
+        + MockHttpServletRequest.class.getName()
+    );
+    IoCComponentProviderFactory ioc = new GuiceComponentProviderFactory(resourceConfig, injector);
+    HttpServer server = GrizzlyServerFactory.createHttpServer(baseUri, resourceConfig, ioc);
+    return server;
+  }
+
+  @Provider
+  public static class MockHttpServletRequest extends
+      SingletonTypeInjectableProvider<Context, HttpServletRequest>
+  {
+    public MockHttpServletRequest()
+    {
+      super(
+          HttpServletRequest.class,
+          createMockRequest()
+      );
+    }
+
+    static HttpServletRequest createMockRequest()
+    {
+      HttpServletRequest mockRequest = EasyMock.createNiceMock(HttpServletRequest.class);
+      AuthenticationResult authenticationResult = new AuthenticationResult(
+          "druid",
+          "druid",
+          null,
+          Collections.emptyMap()
+      );
+
+      EasyMock.expect(mockRequest.getAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED)).andReturn(null).anyTimes();
+      EasyMock.expect(mockRequest.getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT))
+              .andReturn(authenticationResult)
+              .anyTimes();
+      EasyMock.replay(mockRequest);
+      return mockRequest;
+    }
+  }
+}
diff --git a/services/src/main/java/org/apache/druid/cli/CliRouter.java b/services/src/main/java/org/apache/druid/cli/CliRouter.java
index 43ab8b1..e81b0e8 100644
--- a/services/src/main/java/org/apache/druid/cli/CliRouter.java
+++ b/services/src/main/java/org/apache/druid/cli/CliRouter.java
@@ -20,7 +20,6 @@
 package org.apache.druid.cli;
 
 import com.google.common.collect.ImmutableList;
-import com.google.inject.Binder;
 import com.google.inject.Module;
 import com.google.inject.TypeLiteral;
 import com.google.inject.name.Names;
@@ -38,7 +37,7 @@ import org.apache.druid.guice.RouterProcessingModule;
 import org.apache.druid.guice.annotations.Self;
 import org.apache.druid.guice.http.JettyHttpClientModule;
 import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.query.lookup.LookupModule;
+import org.apache.druid.query.lookup.LookupSerdeModule;
 import org.apache.druid.server.AsyncQueryForwardingServlet;
 import org.apache.druid.server.http.RouterResource;
 import org.apache.druid.server.initialization.jetty.JettyServerInitializer;
@@ -80,44 +79,39 @@ public class CliRouter extends ServerRunnable
         new QueryRunnerFactoryModule(),
         new JettyHttpClientModule("druid.router.http", Router.class),
         JettyHttpClientModule.global(),
-        new Module()
-        {
-          @Override
-          public void configure(Binder binder)
-          {
-            binder.bindConstant().annotatedWith(Names.named("serviceName")).to("druid/router");
-            binder.bindConstant().annotatedWith(Names.named("servicePort")).to(8888);
-            binder.bindConstant().annotatedWith(Names.named("tlsServicePort")).to(9088);
+        binder -> {
+          binder.bindConstant().annotatedWith(Names.named("serviceName")).to("druid/router");
+          binder.bindConstant().annotatedWith(Names.named("servicePort")).to(8888);
+          binder.bindConstant().annotatedWith(Names.named("tlsServicePort")).to(9088);
 
-            JsonConfigProvider.bind(binder, "druid.router", TieredBrokerConfig.class);
-            JsonConfigProvider.bind(binder, "druid.router.avatica.balancer", AvaticaConnectionBalancer.class);
-            JsonConfigProvider.bind(binder, "druid.router.managementProxy", ManagementProxyConfig.class);
+          JsonConfigProvider.bind(binder, "druid.router", TieredBrokerConfig.class);
+          JsonConfigProvider.bind(binder, "druid.router.avatica.balancer", AvaticaConnectionBalancer.class);
+          JsonConfigProvider.bind(binder, "druid.router.managementProxy", ManagementProxyConfig.class);
 
-            binder.bind(CoordinatorRuleManager.class);
-            LifecycleModule.register(binder, CoordinatorRuleManager.class);
+          binder.bind(CoordinatorRuleManager.class);
+          LifecycleModule.register(binder, CoordinatorRuleManager.class);
 
-            binder.bind(TieredBrokerHostSelector.class).in(ManageLifecycle.class);
-            binder.bind(QueryHostFinder.class).in(LazySingleton.class);
-            binder.bind(new TypeLiteral<List<TieredBrokerSelectorStrategy>>() {})
-                  .toProvider(TieredBrokerSelectorStrategiesProvider.class)
-                  .in(LazySingleton.class);
+          binder.bind(TieredBrokerHostSelector.class).in(ManageLifecycle.class);
+          binder.bind(QueryHostFinder.class).in(LazySingleton.class);
+          binder.bind(new TypeLiteral<List<TieredBrokerSelectorStrategy>>() {})
+                .toProvider(TieredBrokerSelectorStrategiesProvider.class)
+                .in(LazySingleton.class);
 
-            binder.bind(QueryCountStatsProvider.class).to(AsyncQueryForwardingServlet.class).in(LazySingleton.class);
-            binder.bind(JettyServerInitializer.class).to(RouterJettyServerInitializer.class).in(LazySingleton.class);
+          binder.bind(QueryCountStatsProvider.class).to(AsyncQueryForwardingServlet.class).in(LazySingleton.class);
+          binder.bind(JettyServerInitializer.class).to(RouterJettyServerInitializer.class).in(LazySingleton.class);
 
-            Jerseys.addResource(binder, RouterResource.class);
+          Jerseys.addResource(binder, RouterResource.class);
 
-            LifecycleModule.register(binder, RouterResource.class);
-            LifecycleModule.register(binder, Server.class);
-            DiscoveryModule.register(binder, Self.class);
+          LifecycleModule.register(binder, RouterResource.class);
+          LifecycleModule.register(binder, Server.class);
+          DiscoveryModule.register(binder, Self.class);
 
-            bindAnnouncer(
-                binder,
-                DiscoverySideEffectsProvider.builder(NodeType.ROUTER).build()
-            );
-          }
+          bindAnnouncer(
+              binder,
+              DiscoverySideEffectsProvider.builder(NodeType.ROUTER).build()
+          );
         },
-        new LookupModule()
+        new LookupSerdeModule()
     );
   }
 }
diff --git a/services/src/main/java/org/apache/druid/cli/RouterJettyServerInitializer.java b/services/src/main/java/org/apache/druid/cli/RouterJettyServerInitializer.java
index 57b6cbb..559cade 100644
--- a/services/src/main/java/org/apache/druid/cli/RouterJettyServerInitializer.java
+++ b/services/src/main/java/org/apache/druid/cli/RouterJettyServerInitializer.java
@@ -113,7 +113,9 @@ public class RouterJettyServerInitializer implements JettyServerInitializer
 
     root.addServlet(new ServletHolder(new DefaultServlet()), "/*");
 
-    root.addServlet(buildServletHolder(asyncQueryForwardingServlet, routerHttpClientConfig), "/druid/v2/*");
+    ServletHolder queryServletHolder = buildServletHolder(asyncQueryForwardingServlet, routerHttpClientConfig);
+    root.addServlet(queryServletHolder, "/druid/v2/*");
+    root.addServlet(queryServletHolder, "/druid/v1/lookups/*");
 
     if (managementProxyConfig.isEnabled()) {
       ServletHolder managementForwardingServletHolder = buildServletHolder(
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/expression/builtin/QueryLookupOperatorConversion.java b/sql/src/main/java/org/apache/druid/sql/calcite/expression/builtin/QueryLookupOperatorConversion.java
index ae19d35..c4a2c69 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/expression/builtin/QueryLookupOperatorConversion.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/expression/builtin/QueryLookupOperatorConversion.java
@@ -27,7 +27,7 @@ import org.apache.calcite.sql.type.SqlTypeFamily;
 import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.math.expr.Expr;
-import org.apache.druid.query.lookup.LookupReferencesManager;
+import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider;
 import org.apache.druid.query.lookup.RegisteredLookupExtractionFn;
 import org.apache.druid.sql.calcite.expression.DruidExpression;
 import org.apache.druid.sql.calcite.expression.OperatorConversions;
@@ -44,12 +44,12 @@ public class QueryLookupOperatorConversion implements SqlOperatorConversion
       .functionCategory(SqlFunctionCategory.STRING)
       .build();
 
-  private final LookupReferencesManager lookupReferencesManager;
+  private final LookupExtractorFactoryContainerProvider lookupExtractorFactoryContainerProvider;
 
   @Inject
-  public QueryLookupOperatorConversion(final LookupReferencesManager lookupReferencesManager)
+  public QueryLookupOperatorConversion(final LookupExtractorFactoryContainerProvider lookupExtractorFactoryContainerProvider)
   {
-    this.lookupReferencesManager = lookupReferencesManager;
+    this.lookupExtractorFactoryContainerProvider = lookupExtractorFactoryContainerProvider;
   }
 
   @Override
@@ -77,7 +77,7 @@ public class QueryLookupOperatorConversion implements SqlOperatorConversion
           if (arg.isSimpleExtraction() && lookupNameExpr.isLiteral()) {
             return arg.getSimpleExtraction().cascade(
                 new RegisteredLookupExtractionFn(
-                    lookupReferencesManager,
+                    lookupExtractorFactoryContainerProvider,
                     (String) lookupNameExpr.getLiteralValue(),
                     false,
                     null,
diff --git a/sql/src/main/java/org/apache/druid/sql/guice/SqlModule.java b/sql/src/main/java/org/apache/druid/sql/guice/SqlModule.java
index 88c8b36..fc7e0da 100644
--- a/sql/src/main/java/org/apache/druid/sql/guice/SqlModule.java
+++ b/sql/src/main/java/org/apache/druid/sql/guice/SqlModule.java
@@ -71,7 +71,7 @@ public class SqlModule implements Module
       // Add empty SqlAggregator binder.
       Multibinder.newSetBinder(binder, SqlAggregator.class);
 
-      // QueryLookupOperatorConversion isn't in DruidOperatorTable since it needs a LookupReferencesManager injected.
+      // QueryLookupOperatorConversion isn't in DruidOperatorTable since it needs a LookupExtractorFactoryContainerProvider injected.
       SqlBindings.addOperatorConversion(binder, QueryLookupOperatorConversion.class);
 
       if (isJsonOverHttpEnabled()) {
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
index d692d1b..2c09568 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
@@ -75,7 +75,7 @@ import org.apache.druid.query.groupby.GroupByQueryConfig;
 import org.apache.druid.query.groupby.GroupByQueryRunnerFactory;
 import org.apache.druid.query.groupby.GroupByQueryRunnerTest;
 import org.apache.druid.query.groupby.strategy.GroupByStrategySelector;
-import org.apache.druid.query.lookup.LookupReferencesManager;
+import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider;
 import org.apache.druid.query.metadata.SegmentMetadataQueryConfig;
 import org.apache.druid.query.metadata.SegmentMetadataQueryQueryToolChest;
 import org.apache.druid.query.metadata.SegmentMetadataQueryRunnerFactory;
@@ -225,9 +225,9 @@ public class CalciteTests
       (Module) binder -> {
         binder.bind(Key.get(ObjectMapper.class, Json.class)).toInstance(TestHelper.makeJsonMapper());
 
-        // This Module is just to get a LookupReferencesManager with a usable "lookyloo" lookup.
+        // This Module is just to get a LookupExtractorFactoryContainerProvider with a usable "lookyloo" lookup.
 
-        binder.bind(LookupReferencesManager.class).toInstance(
+        binder.bind(LookupExtractorFactoryContainerProvider.class).toInstance(
             LookupEnabledTestExprMacroTable.createTestLookupReferencesManager(
                 ImmutableMap.of(
                     "a", "xa",


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org