You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2018/08/07 17:23:17 UTC

[GitHub] gianm closed pull request #5108: Cache: Add maxEntrySize config, make groupBy cacheable by default.

gianm closed pull request #5108: Cache: Add maxEntrySize config, make groupBy cacheable by default.
URL: https://github.com/apache/incubator-druid/pull/5108
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/content/configuration/broker.md b/docs/content/configuration/broker.md
index 2d442bff848..36953966a30 100644
--- a/docs/content/configuration/broker.md
+++ b/docs/content/configuration/broker.md
@@ -113,8 +113,9 @@ You can optionally only configure caching to be enabled on the broker by setting
 |`druid.broker.cache.useResultLevelCache`|true, false|Enable result level caching on the broker.|false|
 |`druid.broker.cache.populateResultLevelCache`|true, false|Populate the result level cache on the broker.|false|
 |`druid.broker.cache.resultLevelCacheLimit`|positive integer|Maximum size of query response that can be cached.|`Integer.MAX_VALUE`|
-|`druid.broker.cache.unCacheable`|All druid query types|All query types to not cache.|`["groupBy", "select"]`|
+|`druid.broker.cache.unCacheable`|All druid query types|All query types to not cache.|`["select"]`|
 |`druid.broker.cache.cacheBulkMergeLimit`|positive integer or 0|Queries with more segments than this number will not attempt to fetch from cache at the broker level, leaving potential caching fetches (and cache result merging) to the historicals|`Integer.MAX_VALUE`|
+|`druid.broker.cache.maxEntrySize`|positive integer or -1|Maximum size of an individual cache entry (processed results for one segment), in bytes, or -1 for unlimited.|`1000000` (1MB)|
 
 See [cache configuration](caching.html) for how to configure cache settings.
 
diff --git a/docs/content/configuration/historical.md b/docs/content/configuration/historical.md
index 02dddb92e52..ae770eadaf3 100644
--- a/docs/content/configuration/historical.md
+++ b/docs/content/configuration/historical.md
@@ -97,7 +97,8 @@ You can optionally only configure caching to be enabled on the historical by set
 |--------|---------------|-----------|-------|
 |`druid.historical.cache.useCache`|true, false|Enable the cache on the historical.|false|
 |`druid.historical.cache.populateCache`|true, false|Populate the cache on the historical.|false|
-|`druid.historical.cache.unCacheable`|All druid query types|All query types to not cache.|["groupBy", "select"]|
+|`druid.historical.cache.unCacheable`|All druid query types|All query types to not cache.|["select"]|
+|`druid.historical.cache.maxEntrySize`|positive integer or -1|Maximum size of an individual cache entry (processed results for one segment), in bytes, or -1 for unlimited.|`1000000` (1MB)|
 
 
 See [cache configuration](caching.html) for how to configure cache settings.
diff --git a/docs/content/configuration/index.md b/docs/content/configuration/index.md
index d4ca2dc3079..3d9f2622afe 100644
--- a/docs/content/configuration/index.md
+++ b/docs/content/configuration/index.md
@@ -353,16 +353,21 @@ You can enable caching of results at the broker, historical, or realtime level u
 |<code>druid.(broker&#124;historical&#124;realtime).cache.unCacheable</code>|All druid query types|All query types to not cache.|["groupBy", "select"]|
 |<code>druid.(broker&#124;historical&#124;realtime).cache.useCache</code>|true, false|Whether to use cache for getting query results.|false|
 |<code>druid.(broker&#124;historical&#124;realtime).cache.populateCache</code>|true, false|Whether to populate cache.|false|
+|<code>druid.(broker&#124;historical&#124;realtime).cache.maxEntrySize</code>|positive integer or -1|Maximum size of an individual cache entry (processed results for one segment), in bytes, or -1 for unlimited.|`-1`|
 
 #### Local Cache
 
+<div class="note caution">
+DEPRECATED: Use caffeine instead
+</div>
+
 |Property|Description|Default|
 |--------|-----------|-------|
 |`druid.cache.sizeInBytes`|Maximum cache size in bytes. You must set this if you enabled populateCache/useCache, or else cache size of zero wouldn't really cache anything.|0|
 |`druid.cache.initialSize`|Initial size of the hashtable backing the cache.|500000|
 |`druid.cache.logEvictionCount`|If non-zero, log cache eviction every `logEvictionCount` items.|0|
 
-#### Memcache
+#### Memcached
 
 |Property|Description|Default|
 |--------|-----------|-------|
@@ -372,6 +377,22 @@ You can enable caching of results at the broker, historical, or realtime level u
 |`druid.cache.maxObjectSize`|Maximum object size in bytes for a Memcached object.|52428800 (50 MB)|
 |`druid.cache.memcachedPrefix`|Key prefix for all keys in Memcached.|druid|
 
+#### Caffeine Cache
+
+A highly performant local cache implementation for Druid based on [Caffeine](https://github.com/ben-manes/caffeine). Requires a JRE8u60 or higher if using `COMMON_FJP`.
+
+Below are the configuration options known to this module:
+
+|`runtime.properties`|Description|Default|
+|--------------------|-----------|-------|
+|`druid.cache.type`| Set this to `caffeine`|`local`|
+|`druid.cache.sizeInBytes`|The maximum size of the cache in bytes on heap.|None (unlimited)|
+|`druid.cache.expireAfter`|The time (in ms) after an access for which a cache entry may be expired|None (no time limit)|
+|`druid.cache.cacheExecutorFactory`|The executor factory to use for Caffeine maintenance. One of `COMMON_FJP`, `SINGLE_THREAD`, or `SAME_THREAD`|ForkJoinPool common pool (`COMMON_FJP`)|
+|`druid.cache.evictOnClose`|If a close of a namespace (ex: removing a segment from a node) should cause an eager eviction of associated cache values|`false`|
+
+See the [Caching documentation](caching.html) for more detail.
+
 ### Indexing Service Discovery
 
 This config is used to find the [Indexing Service](../design/indexing-service.html) using Curator service discovery. Only required if you are actually running an indexing service.
diff --git a/docs/content/configuration/realtime.md b/docs/content/configuration/realtime.md
index 80fe9553fef..b75fca5879a 100644
--- a/docs/content/configuration/realtime.md
+++ b/docs/content/configuration/realtime.md
@@ -72,6 +72,7 @@ You can optionally configure caching to be enabled on the realtime node by setti
 |--------|---------------|-----------|-------|
 |`druid.realtime.cache.useCache`|true, false|Enable the cache on the realtime.|false|
 |`druid.realtime.cache.populateCache`|true, false|Populate the cache on the realtime.|false|
-|`druid.realtime.cache.unCacheable`|All druid query types|All query types to not cache.|`["groupBy", "select"]`|
+|`druid.realtime.cache.unCacheable`|All druid query types|All query types to not cache.|`["select"]`|
+|`druid.realtime.cache.maxEntrySize`|positive integer or -1|Maximum size of an individual cache entry (processed results for one segment), in bytes, or -1 for unlimited.|`1000000` (1MB)|
 
 See [cache configuration](caching.html) for how to configure cache settings.
diff --git a/docs/content/operations/metrics.md b/docs/content/operations/metrics.md
index d2c802d0444..6dc3e0f9857 100644
--- a/docs/content/operations/metrics.md
+++ b/docs/content/operations/metrics.md
@@ -88,6 +88,9 @@ Available Metrics
 |`*/averageByte`|Average cache entry byte size.||Varies.|
 |`*/timeouts`|Number of cache timeouts.||0|
 |`*/errors`|Number of cache errors.||0|
+|`*/put/ok`|Number of new cache entries successfully cached.||Varies, but more than zero.|
+|`*/put/error`|Number of new cache entries that could not be cached due to errors.||Varies, but more than zero.|
+|`*/put/oversized`|Number of potential new cache entries that were skipped due to being too large (based on `druid.{broker,historical,realtime}.cache.maxEntrySize` properties).||Varies.|
 
 #### Memcached only metrics
 
diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
index d3a58240d0f..d37a877747f 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
@@ -246,7 +246,8 @@ Appenderator newAppenderator(FireDepartmentMetrics metrics, TaskToolbox toolbox)
         toolbox.getEmitter(),
         toolbox.getQueryExecutorService(),
         toolbox.getCache(),
-        toolbox.getCacheConfig()
+        toolbox.getCacheConfig(),
+        toolbox.getCachePopulatorStats()
     );
   }
 
diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java
index c3553b2b952..053da151092 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -37,6 +37,7 @@
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import io.druid.client.cache.CacheConfig;
+import io.druid.client.cache.CachePopulatorStats;
 import io.druid.client.cache.MapCache;
 import io.druid.data.input.impl.DimensionsSpec;
 import io.druid.data.input.impl.FloatDimensionSchema;
@@ -2144,6 +2145,7 @@ public void close()
         testUtils.getTestIndexIO(),
         MapCache.create(1024),
         new CacheConfig(),
+        new CachePopulatorStats(),
         testUtils.getTestIndexMergerV9(),
         EasyMock.createNiceMock(DruidNodeAnnouncer.class),
         EasyMock.createNiceMock(DruidNode.class),
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/TaskToolbox.java b/indexing-service/src/main/java/io/druid/indexing/common/TaskToolbox.java
index d63c2a1c641..e31969e2bae 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/TaskToolbox.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/TaskToolbox.java
@@ -27,6 +27,7 @@
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Multimaps;
 import com.google.inject.Provider;
+import io.druid.client.cache.CachePopulatorStats;
 import io.druid.java.util.emitter.service.ServiceEmitter;
 import io.druid.java.util.metrics.MonitorScheduler;
 import io.druid.client.cache.Cache;
@@ -89,6 +90,7 @@
   private final IndexIO indexIO;
   private final Cache cache;
   private final CacheConfig cacheConfig;
+  private final CachePopulatorStats cachePopulatorStats;
   private final IndexMergerV9 indexMergerV9;
   private final TaskReportFileWriter taskReportFileWriter;
 
@@ -117,6 +119,7 @@ public TaskToolbox(
       IndexIO indexIO,
       Cache cache,
       CacheConfig cacheConfig,
+      CachePopulatorStats cachePopulatorStats,
       IndexMergerV9 indexMergerV9,
       DruidNodeAnnouncer druidNodeAnnouncer,
       DruidNode druidNode,
@@ -144,6 +147,7 @@ public TaskToolbox(
     this.indexIO = Preconditions.checkNotNull(indexIO, "Null IndexIO");
     this.cache = cache;
     this.cacheConfig = cacheConfig;
+    this.cachePopulatorStats = cachePopulatorStats;
     this.indexMergerV9 = Preconditions.checkNotNull(indexMergerV9, "Null IndexMergerV9");
     this.druidNodeAnnouncer = druidNodeAnnouncer;
     this.druidNode = druidNode;
@@ -268,6 +272,11 @@ public CacheConfig getCacheConfig()
     return cacheConfig;
   }
 
+  public CachePopulatorStats getCachePopulatorStats()
+  {
+    return cachePopulatorStats;
+  }
+
   public IndexMergerV9 getIndexMergerV9()
   {
     return indexMergerV9;
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java b/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java
index 95f1b3238cb..f858761ad40 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java
@@ -23,10 +23,9 @@
 import com.google.common.base.Preconditions;
 import com.google.inject.Inject;
 import com.google.inject.Provider;
-import io.druid.java.util.emitter.service.ServiceEmitter;
-import io.druid.java.util.metrics.MonitorScheduler;
 import io.druid.client.cache.Cache;
 import io.druid.client.cache.CacheConfig;
+import io.druid.client.cache.CachePopulatorStats;
 import io.druid.discovery.DataNodeService;
 import io.druid.discovery.DruidNodeAnnouncer;
 import io.druid.discovery.LookupNodeService;
@@ -35,6 +34,8 @@
 import io.druid.indexing.common.actions.TaskActionClientFactory;
 import io.druid.indexing.common.config.TaskConfig;
 import io.druid.indexing.common.task.Task;
+import io.druid.java.util.emitter.service.ServiceEmitter;
+import io.druid.java.util.metrics.MonitorScheduler;
 import io.druid.query.QueryRunnerFactoryConglomerate;
 import io.druid.segment.IndexIO;
 import io.druid.segment.IndexMergerV9;
@@ -73,6 +74,7 @@
   private final IndexIO indexIO;
   private final Cache cache;
   private final CacheConfig cacheConfig;
+  private final CachePopulatorStats cachePopulatorStats;
   private final IndexMergerV9 indexMergerV9;
   private final DruidNodeAnnouncer druidNodeAnnouncer;
   private final DruidNode druidNode;
@@ -100,6 +102,7 @@ public TaskToolboxFactory(
       IndexIO indexIO,
       Cache cache,
       CacheConfig cacheConfig,
+      CachePopulatorStats cachePopulatorStats,
       IndexMergerV9 indexMergerV9,
       DruidNodeAnnouncer druidNodeAnnouncer,
       @RemoteChatHandler DruidNode druidNode,
@@ -126,6 +129,7 @@ public TaskToolboxFactory(
     this.indexIO = Preconditions.checkNotNull(indexIO, "Null IndexIO");
     this.cache = cache;
     this.cacheConfig = cacheConfig;
+    this.cachePopulatorStats = cachePopulatorStats;
     this.indexMergerV9 = indexMergerV9;
     this.druidNodeAnnouncer = druidNodeAnnouncer;
     this.druidNode = druidNode;
@@ -157,6 +161,7 @@ public TaskToolbox build(Task task)
         indexIO,
         cache,
         cacheConfig,
+        cachePopulatorStats,
         indexMergerV9,
         druidNodeAnnouncer,
         druidNode,
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
index 5dcc35bde4a..70a98226c29 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
@@ -705,7 +705,8 @@ private static Appenderator newAppenderator(
         toolbox.getEmitter(),
         toolbox.getQueryExecutorService(),
         toolbox.getCache(),
-        toolbox.getCacheConfig()
+        toolbox.getCacheConfig(),
+        toolbox.getCachePopulatorStats()
     );
   }
 
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java
index 5ea6c081020..9f7c9583978 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java
@@ -341,6 +341,7 @@ public String getVersion(final Interval interval)
         toolbox.getIndexIO(),
         toolbox.getCache(),
         toolbox.getCacheConfig(),
+        toolbox.getCachePopulatorStats(),
         toolbox.getObjectMapper()
     );
 
diff --git a/indexing-service/src/test/java/io/druid/indexing/common/TaskToolboxTest.java b/indexing-service/src/test/java/io/druid/indexing/common/TaskToolboxTest.java
index d557e8cba96..abd088f1400 100644
--- a/indexing-service/src/test/java/io/druid/indexing/common/TaskToolboxTest.java
+++ b/indexing-service/src/test/java/io/druid/indexing/common/TaskToolboxTest.java
@@ -23,6 +23,7 @@
 import com.google.common.collect.ImmutableList;
 import io.druid.client.cache.Cache;
 import io.druid.client.cache.CacheConfig;
+import io.druid.client.cache.CachePopulatorStats;
 import io.druid.indexing.common.actions.TaskActionClientFactory;
 import io.druid.indexing.common.config.TaskConfig;
 import io.druid.indexing.common.task.NoopTestTaskFileWriter;
@@ -111,6 +112,7 @@ public void setUp() throws IOException
         mockIndexIO,
         mockCache,
         mockCacheConfig,
+        new CachePopulatorStats(),
         mockIndexMergerV9,
         null,
         null,
diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java
index 6d3b31676aa..82acfacf013 100644
--- a/indexing-service/src/test/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java
+++ b/indexing-service/src/test/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java
@@ -30,6 +30,7 @@
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import io.druid.client.cache.CacheConfig;
+import io.druid.client.cache.CachePopulatorStats;
 import io.druid.client.cache.MapCache;
 import io.druid.common.config.NullHandling;
 import io.druid.data.input.Firehose;
@@ -1497,6 +1498,7 @@ public void close()
         testUtils.getTestIndexIO(),
         MapCache.create(1024),
         new CacheConfig(),
+        new CachePopulatorStats(),
         testUtils.getTestIndexMergerV9(),
         EasyMock.createNiceMock(DruidNodeAnnouncer.class),
         EasyMock.createNiceMock(DruidNode.class),
diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/CompactionTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/CompactionTaskTest.java
index 99adee034b3..2288393bbe8 100644
--- a/indexing-service/src/test/java/io/druid/indexing/common/task/CompactionTaskTest.java
+++ b/indexing-service/src/test/java/io/druid/indexing/common/task/CompactionTaskTest.java
@@ -586,6 +586,7 @@ private static void assertIngestionSchema(
           indexIO,
           null,
           null,
+          null,
           new IndexMergerV9(objectMapper, indexIO, OffHeapMemorySegmentWriteOutMediumFactory.instance()),
           null,
           null,
diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java
index c05d2dc41dc..b15261c9eee 100644
--- a/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java
+++ b/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java
@@ -1514,6 +1514,7 @@ public void killAll()
         indexIO,
         null,
         null,
+        null,
         indexMergerV9,
         null,
         null,
diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java
index ac523e5039c..13a9eebff2c 100644
--- a/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java
+++ b/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java
@@ -31,6 +31,7 @@
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import io.druid.client.cache.CacheConfig;
+import io.druid.client.cache.CachePopulatorStats;
 import io.druid.client.cache.MapCache;
 import io.druid.common.config.NullHandling;
 import io.druid.data.input.Firehose;
@@ -1077,6 +1078,7 @@ public void close()
         testUtils.getTestIndexIO(),
         MapCache.create(1024),
         new CacheConfig(),
+        new CachePopulatorStats(),
         testUtils.getTestIndexMergerV9(),
         EasyMock.createNiceMock(DruidNodeAnnouncer.class),
         EasyMock.createNiceMock(DruidNode.class),
diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/SameIntervalMergeTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/SameIntervalMergeTaskTest.java
index 2e816f546bc..900f48e5164 100644
--- a/indexing-service/src/test/java/io/druid/indexing/common/task/SameIntervalMergeTaskTest.java
+++ b/indexing-service/src/test/java/io/druid/indexing/common/task/SameIntervalMergeTaskTest.java
@@ -252,6 +252,7 @@ public void cleanup(DataSegment segment)
             indexIO,
             null,
             null,
+            null,
             EasyMock.createMock(IndexMergerV9.class),
             null,
             null,
diff --git a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java
index 35a23ebadb5..750fe266658 100644
--- a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java
+++ b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java
@@ -308,6 +308,7 @@ public DataSegment restore(DataSegment segment)
         INDEX_IO,
         null,
         null,
+        null,
         INDEX_MERGER_V9,
         null,
         null,
diff --git a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java
index c8149e32d9d..469d4a867e4 100644
--- a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java
+++ b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java
@@ -340,6 +340,7 @@ public TaskActionClient create(Task task)
           INDEX_IO,
           null,
           null,
+          null,
           INDEX_MERGER_V9,
           null,
           null,
diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java
index 9a95b879515..a5cd1251213 100644
--- a/indexing-service/src/test/java/io/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java
+++ b/indexing-service/src/test/java/io/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java
@@ -99,6 +99,7 @@ public void setup() throws IOException
         utils.getTestIndexIO(),
         null,
         null,
+        null,
         utils.getTestIndexMergerV9(),
         null,
         node,
diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java
index 6319a8686d2..7339ee559dc 100644
--- a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java
+++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java
@@ -34,6 +34,7 @@
 import com.google.common.collect.Lists;
 import com.google.common.collect.Ordering;
 import com.google.common.util.concurrent.MoreExecutors;
+import io.druid.client.cache.CachePopulatorStats;
 import io.druid.client.cache.MapCache;
 import io.druid.data.input.Firehose;
 import io.druid.data.input.FirehoseFactory;
@@ -606,6 +607,7 @@ public void unannounceSegments(Iterable<DataSegment> segments)
         INDEX_IO,
         MapCache.create(0),
         FireDepartmentTest.NO_CACHE_CONFIG,
+        new CachePopulatorStats(),
         INDEX_MERGER_V9,
         EasyMock.createNiceMock(DruidNodeAnnouncer.class),
         EasyMock.createNiceMock(DruidNode.class),
diff --git a/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskManagerTest.java b/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskManagerTest.java
index cd7e85e4fce..ff528ea8d82 100644
--- a/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskManagerTest.java
+++ b/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskManagerTest.java
@@ -125,6 +125,7 @@ private WorkerTaskManager createWorkerTaskManager()
                 indexIO,
                 null,
                 null,
+                null,
                 indexMergerV9,
                 null,
                 null,
diff --git a/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java b/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java
index 2b5374b1030..1af92ada5c6 100644
--- a/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java
+++ b/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java
@@ -187,6 +187,7 @@ private WorkerTaskMonitor createTaskMonitor()
                 indexIO,
                 null,
                 null,
+                null,
                 indexMergerV9,
                 null,
                 null,
diff --git a/server/src/main/java/io/druid/client/CacheUtil.java b/server/src/main/java/io/druid/client/CacheUtil.java
index 78ba5e8ac84..622ec0c8db9 100644
--- a/server/src/main/java/io/druid/client/CacheUtil.java
+++ b/server/src/main/java/io/druid/client/CacheUtil.java
@@ -19,9 +19,6 @@
 
 package io.druid.client;
 
-import com.fasterxml.jackson.core.JsonGenerator;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Throwables;
 import io.druid.client.cache.Cache;
 import io.druid.client.cache.CacheConfig;
 import io.druid.java.util.common.StringUtils;
@@ -31,8 +28,6 @@
 import io.druid.query.SegmentDescriptor;
 import org.joda.time.Interval;
 
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
 import java.nio.ByteBuffer;
 
 public class CacheUtil
@@ -57,23 +52,6 @@
     );
   }
 
-  public static void populate(Cache cache, ObjectMapper mapper, Cache.NamedKey key, Iterable<Object> results)
-  {
-    try {
-      ByteArrayOutputStream bytes = new ByteArrayOutputStream();
-      try (JsonGenerator gen = mapper.getFactory().createGenerator(bytes)) {
-        for (Object result : results) {
-          gen.writeObject(result);
-        }
-      }
-
-      cache.put(key, bytes.toByteArray());
-    }
-    catch (IOException e) {
-      throw Throwables.propagate(e);
-    }
-  }
-
   public static <T> boolean useCacheOnBrokers(
       Query<T> query,
       CacheStrategy<T, Object, Query<T>> strategy,
diff --git a/server/src/main/java/io/druid/client/CachingClusteredClient.java b/server/src/main/java/io/druid/client/CachingClusteredClient.java
index 50f6389e2d4..d227dd4ace2 100644
--- a/server/src/main/java/io/druid/client/CachingClusteredClient.java
+++ b/server/src/main/java/io/druid/client/CachingClusteredClient.java
@@ -32,17 +32,12 @@
 import com.google.common.collect.Sets;
 import com.google.common.hash.Hasher;
 import com.google.common.hash.Hashing;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
 import com.google.inject.Inject;
 import io.druid.client.cache.Cache;
 import io.druid.client.cache.CacheConfig;
+import io.druid.client.cache.CachePopulator;
 import io.druid.client.selector.QueryableDruidServer;
 import io.druid.client.selector.ServerSelector;
-import io.druid.guice.annotations.BackgroundCaching;
 import io.druid.guice.annotations.Smile;
 import io.druid.java.util.common.Intervals;
 import io.druid.java.util.common.Pair;
@@ -88,8 +83,6 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.SortedMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ExecutorService;
 import java.util.function.UnaryOperator;
 import java.util.stream.Collectors;
 
@@ -102,8 +95,8 @@
   private final TimelineServerView serverView;
   private final Cache cache;
   private final ObjectMapper objectMapper;
+  private final CachePopulator cachePopulator;
   private final CacheConfig cacheConfig;
-  private final ListeningExecutorService backgroundExecutorService;
 
   @Inject
   public CachingClusteredClient(
@@ -111,7 +104,7 @@ public CachingClusteredClient(
       TimelineServerView serverView,
       Cache cache,
       @Smile ObjectMapper objectMapper,
-      @BackgroundCaching ExecutorService backgroundExecutorService,
+      CachePopulator cachePopulator,
       CacheConfig cacheConfig
   )
   {
@@ -119,13 +112,13 @@ public CachingClusteredClient(
     this.serverView = serverView;
     this.cache = cache;
     this.objectMapper = objectMapper;
+    this.cachePopulator = cachePopulator;
     this.cacheConfig = cacheConfig;
-    this.backgroundExecutorService = MoreExecutors.listeningDecorator(backgroundExecutorService);
 
-    if (cacheConfig.isQueryCacheable(Query.GROUP_BY)) {
+    if (cacheConfig.isQueryCacheable(Query.GROUP_BY) && (cacheConfig.isUseCache() || cacheConfig.isPopulateCache())) {
       log.warn(
-          "Even though groupBy caching is enabled, v2 groupBys will not be cached. "
-          + "Consider disabling cache on your broker and enabling it on your data nodes to enable v2 groupBy caching."
+          "Even though groupBy caching is enabled in your configuration, v2 groupBys will not be cached on the broker. "
+          + "Consider enabling caching on your data nodes if it is not already enabled."
       );
     }
 
@@ -218,7 +211,7 @@ public CachingClusteredClient(
     private final boolean isBySegment;
     private final int uncoveredIntervalsLimit;
     private final Query<T> downstreamQuery;
-    private final Map<String, CachePopulator> cachePopulatorMap = Maps.newHashMap();
+    private final Map<String, Cache.NamedKey> cachePopulatorKeyMap = Maps.newHashMap();
 
     SpecificQueryRunnable(final QueryPlus<T> queryPlus, final Map<String, Object> responseContext)
     {
@@ -420,7 +413,7 @@ private String computeCurrentEtag(final Set<ServerToSegment> segments, @Nullable
         } else if (populateCache) {
           // otherwise, if populating cache, add segment to list of segments to cache
           final String segmentIdentifier = segment.getServer().getSegment().getIdentifier();
-          addCachePopulator(segmentCacheKey, segmentIdentifier, segmentQueryInterval);
+          addCachePopulatorKey(segmentCacheKey, segmentIdentifier, segmentQueryInterval);
         }
       });
       return alreadyCachedResults;
@@ -453,22 +446,22 @@ private String computeCurrentEtag(final Set<ServerToSegment> segments, @Nullable
       }
     }
 
-    private void addCachePopulator(
+    private void addCachePopulatorKey(
         Cache.NamedKey segmentCacheKey,
         String segmentIdentifier,
         Interval segmentQueryInterval
     )
     {
-      cachePopulatorMap.put(
+      cachePopulatorKeyMap.put(
           StringUtils.format("%s_%s", segmentIdentifier, segmentQueryInterval),
-          new CachePopulator(cache, objectMapper, segmentCacheKey)
+          segmentCacheKey
       );
     }
 
     @Nullable
-    private CachePopulator getCachePopulator(String segmentId, Interval segmentInterval)
+    private Cache.NamedKey getCachePopulatorKey(String segmentId, Interval segmentInterval)
     {
-      return cachePopulatorMap.get(StringUtils.format("%s_%s", segmentId, segmentInterval));
+      return cachePopulatorKeyMap.get(StringUtils.format("%s_%s", segmentId, segmentInterval));
     }
 
     private SortedMap<DruidServer, List<SegmentDescriptor>> groupSegmentsByServer(Set<ServerToSegment> segments)
@@ -601,27 +594,19 @@ private void addSequencesFromServer(
           responseContext
       );
       final Function<T, Object> cacheFn = strategy.prepareForSegmentLevelCache();
+
       return resultsBySegments
           .map(result -> {
             final BySegmentResultValueClass<T> resultsOfSegment = result.getValue();
-            final CachePopulator cachePopulator =
-                getCachePopulator(resultsOfSegment.getSegmentId(), resultsOfSegment.getInterval());
-            Sequence<T> res = Sequences
-                .simple(resultsOfSegment.getResults())
-                .map(r -> {
-                  if (cachePopulator != null) {
-                    // only compute cache data if populating cache
-                    cachePopulator.cacheFutures.add(backgroundExecutorService.submit(() -> cacheFn.apply(r)));
-                  }
-                  return r;
-                })
-                .map(
-                    toolChest.makePreComputeManipulatorFn(downstreamQuery, MetricManipulatorFns.deserializing())::apply
-                );
-            if (cachePopulator != null) {
-              res = res.withEffect(cachePopulator::populate, MoreExecutors.sameThreadExecutor());
+            final Cache.NamedKey cachePopulatorKey =
+                getCachePopulatorKey(resultsOfSegment.getSegmentId(), resultsOfSegment.getInterval());
+            Sequence<T> res = Sequences.simple(resultsOfSegment.getResults());
+            if (cachePopulatorKey != null) {
+              res = cachePopulator.wrap(res, cacheFn::apply, cache, cachePopulatorKey);
             }
-            return res;
+            return res.map(
+                toolChest.makePreComputeManipulatorFn(downstreamQuery, MetricManipulatorFns.deserializing())::apply
+            );
           })
           .flatMerge(seq -> seq, query.getResultOrdering());
     }
@@ -644,43 +629,4 @@ SegmentDescriptor getSegmentDescriptor()
       return rhs;
     }
   }
-
-  private class CachePopulator
-  {
-    private final Cache cache;
-    private final ObjectMapper mapper;
-    private final Cache.NamedKey key;
-    private final ConcurrentLinkedQueue<ListenableFuture<Object>> cacheFutures = new ConcurrentLinkedQueue<>();
-
-    CachePopulator(Cache cache, ObjectMapper mapper, Cache.NamedKey key)
-    {
-      this.cache = cache;
-      this.mapper = mapper;
-      this.key = key;
-    }
-
-    public void populate()
-    {
-      Futures.addCallback(
-          Futures.allAsList(cacheFutures),
-          new FutureCallback<List<Object>>()
-          {
-            @Override
-            public void onSuccess(List<Object> cacheData)
-            {
-              CacheUtil.populate(cache, mapper, key, cacheData);
-              // Help out GC by making sure all references are gone
-              cacheFutures.clear();
-            }
-
-            @Override
-            public void onFailure(Throwable throwable)
-            {
-              log.error(throwable, "Background caching failed");
-            }
-          },
-          backgroundExecutorService
-      );
-    }
-  }
 }
diff --git a/server/src/main/java/io/druid/client/CachingQueryRunner.java b/server/src/main/java/io/druid/client/CachingQueryRunner.java
index df9e8856709..422ea651578 100644
--- a/server/src/main/java/io/druid/client/CachingQueryRunner.java
+++ b/server/src/main/java/io/druid/client/CachingQueryRunner.java
@@ -23,18 +23,13 @@
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Function;
 import com.google.common.base.Throwables;
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.SettableFuture;
 import io.druid.client.cache.Cache;
 import io.druid.client.cache.CacheConfig;
+import io.druid.client.cache.CachePopulator;
 import io.druid.java.util.common.guava.BaseSequence;
 import io.druid.java.util.common.guava.Sequence;
 import io.druid.java.util.common.guava.Sequences;
-import io.druid.java.util.common.logger.Logger;
 import io.druid.query.CacheStrategy;
 import io.druid.query.Query;
 import io.druid.query.QueryPlus;
@@ -46,20 +41,19 @@
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.Map;
-import java.util.concurrent.ExecutorService;
 
 public class CachingQueryRunner<T> implements QueryRunner<T>
 {
-  private static final Logger log = new Logger(CachingQueryRunner.class);
   private final String segmentIdentifier;
   private final SegmentDescriptor segmentDescriptor;
   private final QueryRunner<T> base;
   private final QueryToolChest toolChest;
   private final Cache cache;
   private final ObjectMapper mapper;
+  private final CachePopulator cachePopulator;
   private final CacheConfig cacheConfig;
-  private final ListeningExecutorService backgroundExecutorService;
 
   public CachingQueryRunner(
       String segmentIdentifier,
@@ -68,7 +62,7 @@ public CachingQueryRunner(
       Cache cache,
       QueryToolChest toolchest,
       QueryRunner<T> base,
-      ExecutorService backgroundExecutorService,
+      CachePopulator cachePopulator,
       CacheConfig cacheConfig
   )
   {
@@ -78,7 +72,7 @@ public CachingQueryRunner(
     this.toolChest = toolchest;
     this.cache = cache;
     this.mapper = mapper;
-    this.backgroundExecutorService = MoreExecutors.listeningDecorator(backgroundExecutorService);
+    this.cachePopulator = cachePopulator;
     this.cacheConfig = cacheConfig;
   }
 
@@ -140,56 +134,10 @@ public void cleanup(Iterator<T> iterFromMake)
       }
     }
 
-    final Collection<ListenableFuture<?>> cacheFutures = Collections.synchronizedList(Lists.newLinkedList());
+    final Collection<ListenableFuture<?>> cacheFutures = Collections.synchronizedList(new LinkedList<>());
     if (populateCache) {
       final Function cacheFn = strategy.prepareForSegmentLevelCache();
-
-      return Sequences.withEffect(
-          Sequences.map(
-              base.run(queryPlus, responseContext),
-              new Function<T, T>()
-              {
-                @Override
-                public T apply(final T input)
-                {
-                  final SettableFuture<Object> future = SettableFuture.create();
-                  cacheFutures.add(future);
-                  backgroundExecutorService.submit(
-                      new Runnable()
-                      {
-                        @Override
-                        public void run()
-                        {
-                          try {
-                            future.set(cacheFn.apply(input));
-                          }
-                          catch (Exception e) {
-                            // if there is exception, should setException to quit the caching processing
-                            future.setException(e);
-                          }
-                        }
-                      }
-                  );
-                  return input;
-                }
-              }
-          ),
-          new Runnable()
-          {
-            @Override
-            public void run()
-            {
-              try {
-                CacheUtil.populate(cache, mapper, key, Futures.allAsList(cacheFutures).get());
-              }
-              catch (Exception e) {
-                log.error(e, "Error while getting future for cache task");
-                throw Throwables.propagate(e);
-              }
-            }
-          },
-          backgroundExecutorService
-      );
+      return cachePopulator.wrap(base.run(queryPlus, responseContext), value -> cacheFn.apply(value), cache, key);
     } else {
       return base.run(queryPlus, responseContext);
     }
diff --git a/server/src/main/java/io/druid/client/cache/BackgroundCachePopulator.java b/server/src/main/java/io/druid/client/cache/BackgroundCachePopulator.java
new file mode 100644
index 00000000000..d20fcf4f7e6
--- /dev/null
+++ b/server/src/main/java/io/druid/client/cache/BackgroundCachePopulator.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.client.cache;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import io.druid.java.util.common.guava.Sequence;
+import io.druid.java.util.common.guava.Sequences;
+import io.druid.java.util.common.logger.Logger;
+
+import java.io.ByteArrayOutputStream;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Function;
+
+public class BackgroundCachePopulator implements CachePopulator
+{
+  private static final Logger log = new Logger(BackgroundCachePopulator.class);
+
+  private final ListeningExecutorService exec;
+  private final ObjectMapper objectMapper;
+  private final CachePopulatorStats cachePopulatorStats;
+  private final long maxEntrySize;
+
+  public BackgroundCachePopulator(
+      final ExecutorService exec,
+      final ObjectMapper objectMapper,
+      final CachePopulatorStats cachePopulatorStats,
+      final long maxEntrySize
+  )
+  {
+    this.exec = MoreExecutors.listeningDecorator(exec);
+    this.objectMapper = Preconditions.checkNotNull(objectMapper, "objectMapper");
+    this.cachePopulatorStats = Preconditions.checkNotNull(cachePopulatorStats, "cachePopulatorStats");
+    this.maxEntrySize = maxEntrySize;
+  }
+
+  @Override
+  public <T, CacheType> Sequence<T> wrap(
+      final Sequence<T> sequence,
+      final Function<T, CacheType> cacheFn,
+      final Cache cache,
+      final Cache.NamedKey cacheKey
+  )
+  {
+    final List<ListenableFuture<CacheType>> cacheFutures = new LinkedList<>();
+
+    final Sequence<T> wrappedSequence = Sequences.map(
+        sequence,
+        input -> {
+          cacheFutures.add(exec.submit(() -> cacheFn.apply(input)));
+          return input;
+        }
+    );
+
+    return Sequences.withEffect(
+        wrappedSequence,
+        () -> {
+          Futures.addCallback(
+              Futures.allAsList(cacheFutures),
+              new FutureCallback<List<CacheType>>()
+              {
+                @Override
+                public void onSuccess(List<CacheType> results)
+                {
+                  populateCache(cache, cacheKey, results);
+                  // Help out GC by making sure all references are gone
+                  cacheFutures.clear();
+                }
+
+                @Override
+                public void onFailure(Throwable t)
+                {
+                  log.error(t, "Background caching failed");
+                }
+              },
+              exec
+          );
+        },
+        MoreExecutors.sameThreadExecutor()
+    );
+  }
+
+  private <CacheType> void populateCache(
+      final Cache cache,
+      final Cache.NamedKey cacheKey,
+      final List<CacheType> results
+  )
+  {
+    try {
+      final ByteArrayOutputStream bytes = new ByteArrayOutputStream();
+
+      try (JsonGenerator gen = objectMapper.getFactory().createGenerator(bytes)) {
+        for (CacheType result : results) {
+          gen.writeObject(result);
+
+          if (maxEntrySize > 0 && bytes.size() > maxEntrySize) {
+            cachePopulatorStats.incrementOversized();
+            return;
+          }
+        }
+      }
+
+      if (maxEntrySize > 0 && bytes.size() > maxEntrySize) {
+        cachePopulatorStats.incrementOversized();
+        return;
+      }
+
+      cache.put(cacheKey, bytes.toByteArray());
+      cachePopulatorStats.incrementOk();
+    }
+    catch (Exception e) {
+      log.warn(e, "Could not populate cache");
+      cachePopulatorStats.incrementError();
+    }
+  }
+}
diff --git a/server/src/main/java/io/druid/client/cache/CacheConfig.java b/server/src/main/java/io/druid/client/cache/CacheConfig.java
index 4b9f02033ca..7d8e35fd7c3 100644
--- a/server/src/main/java/io/druid/client/cache/CacheConfig.java
+++ b/server/src/main/java/io/druid/client/cache/CacheConfig.java
@@ -20,10 +20,10 @@
 package io.druid.client.cache;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.ImmutableList;
 import io.druid.query.Query;
 
 import javax.validation.constraints.Min;
-import java.util.Arrays;
 import java.util.List;
 
 public class CacheConfig
@@ -52,10 +52,13 @@
   private int cacheBulkMergeLimit = Integer.MAX_VALUE;
 
   @JsonProperty
-  private int resultLevelCacheLimit = Integer.MAX_VALUE;
+  private int maxEntrySize = 1_000_000;
+
+  @JsonProperty
+  private List<String> unCacheable = ImmutableList.of(Query.SELECT);
 
   @JsonProperty
-  private List<String> unCacheable = Arrays.asList(Query.GROUP_BY, Query.SELECT);
+  private int resultLevelCacheLimit = Integer.MAX_VALUE;
 
   public boolean isPopulateCache()
   {
@@ -87,6 +90,11 @@ public int getCacheBulkMergeLimit()
     return cacheBulkMergeLimit;
   }
 
+  public int getMaxEntrySize()
+  {
+    return maxEntrySize;
+  }
+
   public int getResultLevelCacheLimit()
   {
     return resultLevelCacheLimit;
diff --git a/server/src/main/java/io/druid/client/cache/CacheMonitor.java b/server/src/main/java/io/druid/client/cache/CacheMonitor.java
index 515555abee2..576f9dbfd2d 100644
--- a/server/src/main/java/io/druid/client/cache/CacheMonitor.java
+++ b/server/src/main/java/io/druid/client/cache/CacheMonitor.java
@@ -20,27 +20,24 @@
 package io.druid.client.cache;
 
 import com.google.inject.Inject;
+import io.druid.java.util.common.StringUtils;
 import io.druid.java.util.emitter.service.ServiceEmitter;
 import io.druid.java.util.emitter.service.ServiceMetricEvent;
 import io.druid.java.util.metrics.AbstractMonitor;
-import io.druid.java.util.common.StringUtils;
 
 public class CacheMonitor extends AbstractMonitor
 {
   // package private for tests
   volatile Cache cache;
 
+  private final CachePopulatorStats cachePopulatorStats;
   private volatile CacheStats prevCacheStats = null;
+  private volatile CachePopulatorStats.Snapshot prevCachePopulatorStats = null;
 
-  public CacheMonitor()
+  @Inject
+  public CacheMonitor(final CachePopulatorStats cachePopulatorStats)
   {
-  }
-
-  public CacheMonitor(
-      Cache cache
-  )
-  {
-    this.cache = cache;
+    this.cachePopulatorStats = cachePopulatorStats;
   }
 
   // make it possible to enable CacheMonitor even if cache is not bound
@@ -58,10 +55,16 @@ public boolean doMonitor(ServiceEmitter emitter)
       final CacheStats currCacheStats = cache.getStats();
       final CacheStats deltaCacheStats = currCacheStats.delta(prevCacheStats);
 
+      final CachePopulatorStats.Snapshot currCachePopulatorStats = cachePopulatorStats.snapshot();
+      final CachePopulatorStats.Snapshot deltaCachePopulatorStats = currCachePopulatorStats.delta(
+          prevCachePopulatorStats
+      );
+
       final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder();
-      emitStats(emitter, "query/cache/delta", deltaCacheStats, builder);
-      emitStats(emitter, "query/cache/total", currCacheStats, builder);
+      emitStats(emitter, "query/cache/delta", deltaCachePopulatorStats, deltaCacheStats, builder);
+      emitStats(emitter, "query/cache/total", currCachePopulatorStats, currCacheStats, builder);
 
+      prevCachePopulatorStats = currCachePopulatorStats;
       prevCacheStats = currCacheStats;
 
       // Any custom cache statistics that need monitoring
@@ -71,13 +74,15 @@ public boolean doMonitor(ServiceEmitter emitter)
   }
 
   private void emitStats(
-      ServiceEmitter emitter,
+      final ServiceEmitter emitter,
       final String metricPrefix,
-      CacheStats cacheStats,
-      ServiceMetricEvent.Builder builder
+      final CachePopulatorStats.Snapshot cachePopulatorStats,
+      final CacheStats cacheStats,
+      final ServiceMetricEvent.Builder builder
   )
   {
     if (cache != null) {
+      // Cache stats.
       emitter.emit(builder.build(StringUtils.format("%s/numEntries", metricPrefix), cacheStats.getNumEntries()));
       emitter.emit(builder.build(StringUtils.format("%s/sizeBytes", metricPrefix), cacheStats.getSizeInBytes()));
       emitter.emit(builder.build(StringUtils.format("%s/hits", metricPrefix), cacheStats.getNumHits()));
@@ -87,6 +92,13 @@ private void emitStats(
       emitter.emit(builder.build(StringUtils.format("%s/averageBytes", metricPrefix), cacheStats.averageBytes()));
       emitter.emit(builder.build(StringUtils.format("%s/timeouts", metricPrefix), cacheStats.getNumTimeouts()));
       emitter.emit(builder.build(StringUtils.format("%s/errors", metricPrefix), cacheStats.getNumErrors()));
+
+      // Cache populator stats.
+      emitter.emit(builder.build(StringUtils.format("%s/put/ok", metricPrefix), cachePopulatorStats.getNumOk()));
+      emitter.emit(builder.build(StringUtils.format("%s/put/error", metricPrefix), cachePopulatorStats.getNumError()));
+      emitter.emit(
+          builder.build(StringUtils.format("%s/put/oversized", metricPrefix), cachePopulatorStats.getNumOversized())
+      );
     }
   }
 }
diff --git a/processing/src/main/java/io/druid/guice/annotations/BackgroundCaching.java b/server/src/main/java/io/druid/client/cache/CachePopulator.java
similarity index 66%
rename from processing/src/main/java/io/druid/guice/annotations/BackgroundCaching.java
rename to server/src/main/java/io/druid/client/cache/CachePopulator.java
index abd75149b76..5c8fc88776f 100644
--- a/processing/src/main/java/io/druid/guice/annotations/BackgroundCaching.java
+++ b/server/src/main/java/io/druid/client/cache/CachePopulator.java
@@ -17,21 +17,18 @@
  * under the License.
  */
 
-package io.druid.guice.annotations;
+package io.druid.client.cache;
 
-import com.google.inject.BindingAnnotation;
+import io.druid.java.util.common.guava.Sequence;
 
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
+import java.util.function.Function;
 
-/**
- *
- */
-@BindingAnnotation
-@Target({ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD})
-@Retention(RetentionPolicy.RUNTIME)
-public @interface BackgroundCaching
+public interface CachePopulator
 {
+  <T, CacheType> Sequence<T> wrap(
+      Sequence<T> sequence,
+      Function<T, CacheType> cacheFn,
+      Cache cache,
+      Cache.NamedKey cacheKey
+  );
 }
diff --git a/server/src/main/java/io/druid/client/cache/CachePopulatorStats.java b/server/src/main/java/io/druid/client/cache/CachePopulatorStats.java
new file mode 100644
index 00000000000..f1bcd30d8dd
--- /dev/null
+++ b/server/src/main/java/io/druid/client/cache/CachePopulatorStats.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.client.cache;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ */
+public class CachePopulatorStats
+{
+  private final AtomicLong okCounter = new AtomicLong();
+  private final AtomicLong errorCounter = new AtomicLong();
+  private final AtomicLong oversizedCounter = new AtomicLong();
+
+  public void incrementOk()
+  {
+    okCounter.incrementAndGet();
+  }
+
+  public void incrementError()
+  {
+    errorCounter.incrementAndGet();
+  }
+
+  public void incrementOversized()
+  {
+    oversizedCounter.incrementAndGet();
+  }
+
+  public Snapshot snapshot()
+  {
+    return new Snapshot(
+        okCounter.get(),
+        errorCounter.get(),
+        oversizedCounter.get()
+    );
+  }
+
+  public static class Snapshot
+  {
+    private final long numOk;
+    private final long numError;
+    private final long numOversized;
+
+    Snapshot(final long numOk, final long numError, final long numOversized)
+    {
+      this.numOk = numOk;
+      this.numError = numError;
+      this.numOversized = numOversized;
+    }
+
+    public long getNumOk()
+    {
+      return numOk;
+    }
+
+    public long getNumError()
+    {
+      return numError;
+    }
+
+    public long getNumOversized()
+    {
+      return numOversized;
+    }
+
+    public Snapshot delta(Snapshot oldSnapshot)
+    {
+      if (oldSnapshot == null) {
+        return this;
+      } else {
+        return new Snapshot(
+            numOk - oldSnapshot.numOk,
+            numError - oldSnapshot.numError,
+            numOversized - oldSnapshot.numOversized
+        );
+      }
+    }
+  }
+}
diff --git a/server/src/main/java/io/druid/client/cache/ForegroundCachePopulator.java b/server/src/main/java/io/druid/client/cache/ForegroundCachePopulator.java
new file mode 100644
index 00000000000..7e7fd5e386e
--- /dev/null
+++ b/server/src/main/java/io/druid/client/cache/ForegroundCachePopulator.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.client.cache;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import io.druid.java.util.common.guava.Sequence;
+import io.druid.java.util.common.guava.SequenceWrapper;
+import io.druid.java.util.common.guava.Sequences;
+import io.druid.java.util.common.logger.Logger;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+
+public class ForegroundCachePopulator implements CachePopulator
+{
+  private static final Logger log = new Logger(ForegroundCachePopulator.class);
+
+  private final Object lock = new Object();
+  private final ObjectMapper objectMapper;
+  private final CachePopulatorStats cachePopulatorStats;
+  private final long maxEntrySize;
+
+  public ForegroundCachePopulator(
+      final ObjectMapper objectMapper,
+      final CachePopulatorStats cachePopulatorStats,
+      final long maxEntrySize
+  )
+  {
+    this.objectMapper = Preconditions.checkNotNull(objectMapper, "objectMapper");
+    this.cachePopulatorStats = Preconditions.checkNotNull(cachePopulatorStats, "cachePopulatorStats");
+    this.maxEntrySize = maxEntrySize;
+  }
+
+  @Override
+  public <T, CacheType> Sequence<T> wrap(
+      final Sequence<T> sequence,
+      final Function<T, CacheType> cacheFn,
+      final Cache cache,
+      final Cache.NamedKey cacheKey
+  )
+  {
+    final ByteArrayOutputStream bytes = new ByteArrayOutputStream();
+    final AtomicBoolean tooBig = new AtomicBoolean(false);
+    final JsonGenerator jsonGenerator;
+
+    try {
+      jsonGenerator = objectMapper.getFactory().createGenerator(bytes);
+    }
+    catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+
+    return Sequences.wrap(
+        Sequences.map(
+            sequence,
+            input -> {
+              if (!tooBig.get()) {
+                synchronized (lock) {
+                  try {
+                    jsonGenerator.writeObject(cacheFn.apply(input));
+
+                    // Not flushing jsonGenerator before checking this, but should be ok since Jackson buffers are
+                    // typically just a few KB, and we don't want to waste cycles flushing.
+                    if (maxEntrySize > 0 && bytes.size() > maxEntrySize) {
+                      tooBig.set(true);
+                    }
+                  }
+                  catch (IOException e) {
+                    throw new RuntimeException(e);
+                  }
+                }
+              }
+
+              return input;
+            }
+        ),
+        new SequenceWrapper()
+        {
+          @Override
+          public void after(final boolean isDone, final Throwable thrown) throws Exception
+          {
+            synchronized (lock) {
+              jsonGenerator.close();
+
+              if (isDone) {
+                // Check tooBig, then check maxEntrySize one more time, after closing/flushing jsonGenerator.
+                if (tooBig.get() || (maxEntrySize > 0 && bytes.size() > maxEntrySize)) {
+                  cachePopulatorStats.incrementOversized();
+                  return;
+                }
+
+                try {
+                  cache.put(cacheKey, bytes.toByteArray());
+                  cachePopulatorStats.incrementOk();
+                }
+                catch (Exception e) {
+                  log.warn(e, "Unable to write to cache");
+                  cachePopulatorStats.incrementError();
+                }
+              }
+            }
+          }
+        }
+    );
+  }
+}
diff --git a/server/src/main/java/io/druid/client/cache/HybridCache.java b/server/src/main/java/io/druid/client/cache/HybridCache.java
index f732bee7c92..c25beff3da1 100644
--- a/server/src/main/java/io/druid/client/cache/HybridCache.java
+++ b/server/src/main/java/io/druid/client/cache/HybridCache.java
@@ -21,8 +21,8 @@
 
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
-import io.druid.java.util.emitter.service.ServiceEmitter;
 import io.druid.java.util.common.logger.Logger;
+import io.druid.java.util.emitter.service.ServiceEmitter;
 
 import javax.annotation.Nullable;
 import java.util.Collections;
diff --git a/server/src/main/java/io/druid/guice/CacheModule.java b/server/src/main/java/io/druid/guice/CacheModule.java
index 19d6e570c1b..488e52ca50c 100644
--- a/server/src/main/java/io/druid/guice/CacheModule.java
+++ b/server/src/main/java/io/druid/guice/CacheModule.java
@@ -24,6 +24,7 @@
 import com.google.inject.Module;
 import com.google.inject.name.Names;
 import io.druid.client.cache.Cache;
+import io.druid.client.cache.CachePopulatorStats;
 import io.druid.client.cache.CacheProvider;
 import io.druid.guice.annotations.Global;
 
@@ -48,6 +49,7 @@ public CacheModule(String prefix)
   public void configure(Binder binder)
   {
     binder.bind(Cache.class).toProvider(Key.get(CacheProvider.class, Global.class)).in(ManageLifecycle.class);
+    binder.bind(CachePopulatorStats.class).in(LazySingleton.class);
     JsonConfigProvider.bind(binder, prefix, CacheProvider.class, Global.class);
 
     binder.install(new HybridCacheModule(prefix));
diff --git a/server/src/main/java/io/druid/guice/DruidProcessingModule.java b/server/src/main/java/io/druid/guice/DruidProcessingModule.java
index c0c328737c1..ff9859d08ca 100644
--- a/server/src/main/java/io/druid/guice/DruidProcessingModule.java
+++ b/server/src/main/java/io/druid/guice/DruidProcessingModule.java
@@ -19,22 +19,26 @@
 
 package io.druid.guice;
 
-import com.google.common.util.concurrent.MoreExecutors;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.inject.Binder;
 import com.google.inject.Module;
 import com.google.inject.Provides;
 import com.google.inject.ProvisionException;
+import io.druid.client.cache.BackgroundCachePopulator;
 import io.druid.client.cache.CacheConfig;
+import io.druid.client.cache.CachePopulator;
+import io.druid.client.cache.CachePopulatorStats;
+import io.druid.client.cache.ForegroundCachePopulator;
 import io.druid.collections.BlockingPool;
 import io.druid.collections.DefaultBlockingPool;
 import io.druid.collections.NonBlockingPool;
 import io.druid.collections.StupidPool;
 import io.druid.common.utils.VMUtils;
-import io.druid.guice.annotations.BackgroundCaching;
 import io.druid.guice.annotations.Global;
 import io.druid.guice.annotations.Merging;
 import io.druid.guice.annotations.Processing;
+import io.druid.guice.annotations.Smile;
 import io.druid.java.util.common.StringUtils;
 import io.druid.java.util.common.concurrent.ExecutorServiceConfig;
 import io.druid.java.util.common.lifecycle.Lifecycle;
@@ -64,14 +68,15 @@ public void configure(Binder binder)
   }
 
   @Provides
-  @BackgroundCaching
   @LazySingleton
-  public ExecutorService getBackgroundExecutorService(
+  public CachePopulator getCachePopulator(
+      @Smile ObjectMapper smileMapper,
+      CachePopulatorStats cachePopulatorStats,
       CacheConfig cacheConfig
   )
   {
     if (cacheConfig.getNumBackgroundThreads() > 0) {
-      return Executors.newFixedThreadPool(
+      final ExecutorService exec = Executors.newFixedThreadPool(
           cacheConfig.getNumBackgroundThreads(),
           new ThreadFactoryBuilder()
               .setNameFormat("background-cacher-%d")
@@ -79,8 +84,10 @@ public ExecutorService getBackgroundExecutorService(
               .setPriority(Thread.MIN_PRIORITY)
               .build()
       );
+
+      return new BackgroundCachePopulator(exec, smileMapper, cachePopulatorStats, cacheConfig.getMaxEntrySize());
     } else {
-      return MoreExecutors.sameThreadExecutor();
+      return new ForegroundCachePopulator(smileMapper, cachePopulatorStats, cacheConfig.getMaxEntrySize());
     }
   }
 
diff --git a/server/src/main/java/io/druid/guice/RouterProcessingModule.java b/server/src/main/java/io/druid/guice/RouterProcessingModule.java
index 17d132645a6..4b2bbc1710a 100644
--- a/server/src/main/java/io/druid/guice/RouterProcessingModule.java
+++ b/server/src/main/java/io/druid/guice/RouterProcessingModule.java
@@ -22,16 +22,14 @@
 import com.google.inject.Binder;
 import com.google.inject.Module;
 import com.google.inject.Provides;
-import io.druid.client.cache.CacheConfig;
 import io.druid.collections.BlockingPool;
 import io.druid.collections.DummyBlockingPool;
 import io.druid.collections.DummyNonBlockingPool;
 import io.druid.collections.NonBlockingPool;
-import io.druid.java.util.common.concurrent.Execs;
-import io.druid.guice.annotations.BackgroundCaching;
 import io.druid.guice.annotations.Global;
 import io.druid.guice.annotations.Merging;
 import io.druid.guice.annotations.Processing;
+import io.druid.java.util.common.concurrent.Execs;
 import io.druid.java.util.common.concurrent.ExecutorServiceConfig;
 import io.druid.java.util.common.logger.Logger;
 import io.druid.query.DruidProcessingConfig;
@@ -58,20 +56,6 @@ public void configure(Binder binder)
     MetricsModule.register(binder, ExecutorServiceMonitor.class);
   }
 
-  @Provides
-  @BackgroundCaching
-  @LazySingleton
-  public ExecutorService getBackgroundExecutorService(CacheConfig cacheConfig)
-  {
-    if (cacheConfig.getNumBackgroundThreads() > 0) {
-      log.error(
-          "numBackgroundThreads[%d] configured, that is ignored on Router",
-          cacheConfig.getNumBackgroundThreads()
-      );
-    }
-    return Execs.dummy();
-  }
-
   @Provides
   @Processing
   @ManageLifecycle
diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java
index c0169e74371..5b79626880d 100644
--- a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java
+++ b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java
@@ -40,6 +40,7 @@
 import com.google.common.util.concurrent.MoreExecutors;
 import io.druid.client.cache.Cache;
 import io.druid.client.cache.CacheConfig;
+import io.druid.client.cache.CachePopulatorStats;
 import io.druid.common.guava.ThreadRenamingCallable;
 import io.druid.data.input.Committer;
 import io.druid.data.input.InputRow;
@@ -166,7 +167,8 @@
       IndexIO indexIO,
       IndexMerger indexMerger,
       Cache cache,
-      CacheConfig cacheConfig
+      CacheConfig cacheConfig,
+      CachePopulatorStats cachePopulatorStats
   )
   {
     this.schema = Preconditions.checkNotNull(schema, "schema");
@@ -186,7 +188,8 @@
         conglomerate,
         queryExecutorService,
         Preconditions.checkNotNull(cache, "cache"),
-        cacheConfig
+        cacheConfig,
+        cachePopulatorStats
     );
     maxBytesTuningConfig = TuningConfigs.getMaxBytesInMemoryOrDefault(tuningConfig.getMaxBytesInMemory());
     log.info("Created Appenderator for dataSource[%s].", schema.getDataSource());
diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/Appenderators.java b/server/src/main/java/io/druid/segment/realtime/appenderator/Appenderators.java
index 7651c40c54f..1e22cdd8513 100644
--- a/server/src/main/java/io/druid/segment/realtime/appenderator/Appenderators.java
+++ b/server/src/main/java/io/druid/segment/realtime/appenderator/Appenderators.java
@@ -22,6 +22,7 @@
 import com.fasterxml.jackson.databind.ObjectMapper;
 import io.druid.client.cache.Cache;
 import io.druid.client.cache.CacheConfig;
+import io.druid.client.cache.CachePopulatorStats;
 import io.druid.java.util.common.StringUtils;
 import io.druid.java.util.emitter.service.ServiceEmitter;
 import io.druid.query.QueryRunnerFactoryConglomerate;
@@ -52,7 +53,8 @@ public static Appenderator createRealtime(
       ServiceEmitter emitter,
       ExecutorService queryExecutorService,
       Cache cache,
-      CacheConfig cacheConfig
+      CacheConfig cacheConfig,
+      CachePopulatorStats cachePopulatorStats
   )
   {
     return new AppenderatorImpl(
@@ -68,7 +70,8 @@ public static Appenderator createRealtime(
         indexIO,
         indexMerger,
         cache,
-        cacheConfig
+        cacheConfig,
+        cachePopulatorStats
     );
   }
 
@@ -120,6 +123,7 @@ public void unannounceSegments(Iterable<DataSegment> segments)
         indexIO,
         indexMerger,
         null,
+        null,
         null
     );
   }
diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/DefaultRealtimeAppenderatorFactory.java b/server/src/main/java/io/druid/segment/realtime/appenderator/DefaultRealtimeAppenderatorFactory.java
index b6b188e801e..7d027d64aa2 100644
--- a/server/src/main/java/io/druid/segment/realtime/appenderator/DefaultRealtimeAppenderatorFactory.java
+++ b/server/src/main/java/io/druid/segment/realtime/appenderator/DefaultRealtimeAppenderatorFactory.java
@@ -21,11 +21,11 @@
 
 import com.fasterxml.jackson.annotation.JacksonInject;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import io.druid.java.util.emitter.service.ServiceEmitter;
-
 import io.druid.client.cache.Cache;
 import io.druid.client.cache.CacheConfig;
+import io.druid.client.cache.CachePopulatorStats;
 import io.druid.guice.annotations.Processing;
+import io.druid.java.util.emitter.service.ServiceEmitter;
 import io.druid.query.QueryRunnerFactoryConglomerate;
 import io.druid.segment.IndexIO;
 import io.druid.segment.IndexMerger;
@@ -51,6 +51,7 @@
   private final IndexMerger indexMerger;
   private final Cache cache;
   private final CacheConfig cacheConfig;
+  private final CachePopulatorStats cachePopulatorStats;
 
   public DefaultRealtimeAppenderatorFactory(
       @JacksonInject ServiceEmitter emitter,
@@ -62,7 +63,8 @@ public DefaultRealtimeAppenderatorFactory(
       @JacksonInject IndexIO indexIO,
       @JacksonInject IndexMerger indexMerger,
       @JacksonInject Cache cache,
-      @JacksonInject CacheConfig cacheConfig
+      @JacksonInject CacheConfig cacheConfig,
+      @JacksonInject CachePopulatorStats cachePopulatorStats
   )
   {
     this.emitter = emitter;
@@ -75,6 +77,7 @@ public DefaultRealtimeAppenderatorFactory(
     this.indexMerger = indexMerger;
     this.cache = cache;
     this.cacheConfig = cacheConfig;
+    this.cachePopulatorStats = cachePopulatorStats;
   }
 
   @Override
@@ -103,7 +106,8 @@ public Appenderator build(
         emitter,
         queryExecutorService,
         cache,
-        cacheConfig
+        cacheConfig,
+        cachePopulatorStats
     );
   }
 
diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java b/server/src/main/java/io/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java
index 6901ace8493..ef18be2dec9 100644
--- a/server/src/main/java/io/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java
+++ b/server/src/main/java/io/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java
@@ -24,15 +24,17 @@
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Iterables;
 import com.google.common.util.concurrent.MoreExecutors;
-import io.druid.java.util.emitter.EmittingLogger;
-import io.druid.java.util.emitter.service.ServiceEmitter;
 import io.druid.client.CachingQueryRunner;
 import io.druid.client.cache.Cache;
 import io.druid.client.cache.CacheConfig;
+import io.druid.client.cache.CachePopulatorStats;
+import io.druid.client.cache.ForegroundCachePopulator;
 import io.druid.java.util.common.ISE;
 import io.druid.java.util.common.Pair;
 import io.druid.java.util.common.guava.CloseQuietly;
 import io.druid.java.util.common.guava.FunctionalIterable;
+import io.druid.java.util.emitter.EmittingLogger;
+import io.druid.java.util.emitter.service.ServiceEmitter;
 import io.druid.query.BySegmentQueryRunner;
 import io.druid.query.CPUTimeMetricQueryRunner;
 import io.druid.query.MetricsEmittingQueryRunner;
@@ -76,6 +78,7 @@
   private final ExecutorService queryExecutorService;
   private final Cache cache;
   private final CacheConfig cacheConfig;
+  private final CachePopulatorStats cachePopulatorStats;
 
   public SinkQuerySegmentWalker(
       String dataSource,
@@ -85,7 +88,8 @@ public SinkQuerySegmentWalker(
       QueryRunnerFactoryConglomerate conglomerate,
       ExecutorService queryExecutorService,
       Cache cache,
-      CacheConfig cacheConfig
+      CacheConfig cacheConfig,
+      CachePopulatorStats cachePopulatorStats
   )
   {
     this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource");
@@ -96,6 +100,7 @@ public SinkQuerySegmentWalker(
     this.queryExecutorService = Preconditions.checkNotNull(queryExecutorService, "queryExecutorService");
     this.cache = Preconditions.checkNotNull(cache, "cache");
     this.cacheConfig = Preconditions.checkNotNull(cacheConfig, "cacheConfig");
+    this.cachePopulatorStats = Preconditions.checkNotNull(cachePopulatorStats, "cachePopulatorStats");
 
     if (!cache.isLocal()) {
       log.warn("Configured cache[%s] is not local, caching will not be enabled.", cache.getClass().getName());
@@ -235,7 +240,12 @@ public SegmentDescriptor apply(final PartitionChunk<Sink> chunk)
                                                             cache,
                                                             toolChest,
                                                             baseRunner,
-                                                            MoreExecutors.sameThreadExecutor(),
+                                                            // Always populate in foreground regardless of config
+                                                            new ForegroundCachePopulator(
+                                                                objectMapper,
+                                                                cachePopulatorStats,
+                                                                cacheConfig.getMaxEntrySize()
+                                                            ),
                                                             cacheConfig
                                                         );
                                                       } else {
diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/FlushingPlumber.java b/server/src/main/java/io/druid/segment/realtime/plumber/FlushingPlumber.java
index aefbdf0c3a7..2bdf3a3dba0 100644
--- a/server/src/main/java/io/druid/segment/realtime/plumber/FlushingPlumber.java
+++ b/server/src/main/java/io/druid/segment/realtime/plumber/FlushingPlumber.java
@@ -23,6 +23,7 @@
 import com.google.common.collect.Lists;
 import io.druid.client.cache.Cache;
 import io.druid.client.cache.CacheConfig;
+import io.druid.client.cache.CachePopulatorStats;
 import io.druid.common.guava.ThreadRenamingCallable;
 import io.druid.java.util.common.DateTimes;
 import io.druid.java.util.common.StringUtils;
@@ -73,6 +74,7 @@ public FlushingPlumber(
       IndexIO indexIO,
       Cache cache,
       CacheConfig cacheConfig,
+      CachePopulatorStats cachePopulatorStats,
       ObjectMapper objectMapper
 
   )
@@ -92,6 +94,7 @@ public FlushingPlumber(
         indexIO,
         cache,
         cacheConfig,
+        cachePopulatorStats,
         objectMapper
     );
 
diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/FlushingPlumberSchool.java b/server/src/main/java/io/druid/segment/realtime/plumber/FlushingPlumberSchool.java
index ea5e85b9514..244b2f09feb 100644
--- a/server/src/main/java/io/druid/segment/realtime/plumber/FlushingPlumberSchool.java
+++ b/server/src/main/java/io/druid/segment/realtime/plumber/FlushingPlumberSchool.java
@@ -24,10 +24,11 @@
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
-import io.druid.java.util.emitter.service.ServiceEmitter;
 import io.druid.client.cache.Cache;
 import io.druid.client.cache.CacheConfig;
+import io.druid.client.cache.CachePopulatorStats;
 import io.druid.guice.annotations.Processing;
+import io.druid.java.util.emitter.service.ServiceEmitter;
 import io.druid.query.QueryRunnerFactoryConglomerate;
 import io.druid.segment.IndexIO;
 import io.druid.segment.IndexMergerV9;
@@ -57,6 +58,7 @@
   private final IndexIO indexIO;
   private final Cache cache;
   private final CacheConfig cacheConfig;
+  private final CachePopulatorStats cachePopulatorStats;
   private final ObjectMapper objectMapper;
 
   @JsonCreator
@@ -70,6 +72,7 @@ public FlushingPlumberSchool(
       @JacksonInject IndexIO indexIO,
       @JacksonInject Cache cache,
       @JacksonInject CacheConfig cacheConfig,
+      @JacksonInject CachePopulatorStats cachePopulatorStats,
       @JacksonInject ObjectMapper objectMapper
   )
   {
@@ -85,6 +88,7 @@ public FlushingPlumberSchool(
         indexIO,
         cache,
         cacheConfig,
+        cachePopulatorStats,
         objectMapper
     );
 
@@ -97,6 +101,7 @@ public FlushingPlumberSchool(
     this.indexIO = Preconditions.checkNotNull(indexIO, "Null IndexIO");
     this.cache = cache;
     this.cacheConfig = cacheConfig;
+    this.cachePopulatorStats = cachePopulatorStats;
     this.objectMapper = objectMapper;
   }
 
@@ -122,6 +127,7 @@ public Plumber findPlumber(
         indexIO,
         cache,
         cacheConfig,
+        cachePopulatorStats,
         objectMapper
     );
   }
diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java
index 1e7a5c8008c..bfef2c623cb 100644
--- a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java
+++ b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java
@@ -32,6 +32,7 @@
 import com.google.common.primitives.Ints;
 import io.druid.client.cache.Cache;
 import io.druid.client.cache.CacheConfig;
+import io.druid.client.cache.CachePopulatorStats;
 import io.druid.common.guava.ThreadRenamingCallable;
 import io.druid.common.guava.ThreadRenamingRunnable;
 import io.druid.common.utils.VMUtils;
@@ -146,6 +147,7 @@ public RealtimePlumber(
       IndexIO indexIO,
       Cache cache,
       CacheConfig cacheConfig,
+      CachePopulatorStats cachePopulatorStats,
       ObjectMapper objectMapper
   )
   {
@@ -168,7 +170,8 @@ public RealtimePlumber(
         conglomerate,
         queryExecutorService,
         cache,
-        cacheConfig
+        cacheConfig,
+        cachePopulatorStats
     );
 
     log.info("Creating plumber using rejectionPolicy[%s]", getRejectionPolicy());
diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumberSchool.java b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumberSchool.java
index ec3ed406af7..6ad7acadb32 100644
--- a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumberSchool.java
+++ b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumberSchool.java
@@ -23,10 +23,11 @@
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
-import io.druid.java.util.emitter.service.ServiceEmitter;
 import io.druid.client.cache.Cache;
 import io.druid.client.cache.CacheConfig;
+import io.druid.client.cache.CachePopulatorStats;
 import io.druid.guice.annotations.Processing;
+import io.druid.java.util.emitter.service.ServiceEmitter;
 import io.druid.query.QueryRunnerFactoryConglomerate;
 import io.druid.segment.IndexIO;
 import io.druid.segment.IndexMergerV9;
@@ -54,6 +55,7 @@
   private final IndexIO indexIO;
   private final Cache cache;
   private final CacheConfig cacheConfig;
+  private final CachePopulatorStats cachePopulatorStats;
   private final ObjectMapper objectMapper;
 
   @JsonCreator
@@ -69,6 +71,7 @@ public RealtimePlumberSchool(
       @JacksonInject IndexIO indexIO,
       @JacksonInject Cache cache,
       @JacksonInject CacheConfig cacheConfig,
+      @JacksonInject CachePopulatorStats cachePopulatorStats,
       @JacksonInject ObjectMapper objectMapper
   )
   {
@@ -84,6 +87,7 @@ public RealtimePlumberSchool(
 
     this.cache = cache;
     this.cacheConfig = cacheConfig;
+    this.cachePopulatorStats = cachePopulatorStats;
     this.objectMapper = objectMapper;
   }
 
@@ -111,6 +115,7 @@ public Plumber findPlumber(
         indexIO,
         cache,
         cacheConfig,
+        cachePopulatorStats,
         objectMapper
     );
   }
diff --git a/server/src/main/java/io/druid/server/coordination/ServerManager.java b/server/src/main/java/io/druid/server/coordination/ServerManager.java
index aac2e860234..4a134d7f11d 100644
--- a/server/src/main/java/io/druid/server/coordination/ServerManager.java
+++ b/server/src/main/java/io/druid/server/coordination/ServerManager.java
@@ -26,7 +26,7 @@
 import io.druid.client.CachingQueryRunner;
 import io.druid.client.cache.Cache;
 import io.druid.client.cache.CacheConfig;
-import io.druid.guice.annotations.BackgroundCaching;
+import io.druid.client.cache.CachePopulator;
 import io.druid.guice.annotations.Processing;
 import io.druid.guice.annotations.Smile;
 import io.druid.java.util.common.ISE;
@@ -77,7 +77,7 @@
   private final QueryRunnerFactoryConglomerate conglomerate;
   private final ServiceEmitter emitter;
   private final ExecutorService exec;
-  private final ExecutorService cachingExec;
+  private final CachePopulator cachePopulator;
   private final Cache cache;
   private final ObjectMapper objectMapper;
   private final CacheConfig cacheConfig;
@@ -89,7 +89,7 @@ public ServerManager(
       QueryRunnerFactoryConglomerate conglomerate,
       ServiceEmitter emitter,
       @Processing ExecutorService exec,
-      @BackgroundCaching ExecutorService cachingExec,
+      CachePopulator cachePopulator,
       @Smile ObjectMapper objectMapper,
       Cache cache,
       CacheConfig cacheConfig,
@@ -101,7 +101,7 @@ public ServerManager(
     this.emitter = emitter;
 
     this.exec = exec;
-    this.cachingExec = cachingExec;
+    this.cachePopulator = cachePopulator;
     this.cache = cache;
     this.objectMapper = objectMapper;
 
@@ -298,7 +298,7 @@ private String getDataSourceName(DataSource dataSource)
         cache,
         toolChest,
         metricsEmittingQueryRunnerInner,
-        cachingExec,
+        cachePopulator,
         cacheConfig
     );
 
diff --git a/server/src/test/java/io/druid/client/CachingClusteredClientFunctionalityTest.java b/server/src/test/java/io/druid/client/CachingClusteredClientFunctionalityTest.java
index f50eea7e97d..67ac01c0130 100644
--- a/server/src/test/java/io/druid/client/CachingClusteredClientFunctionalityTest.java
+++ b/server/src/test/java/io/druid/client/CachingClusteredClientFunctionalityTest.java
@@ -21,10 +21,11 @@
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Ordering;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
 import io.druid.client.cache.Cache;
+import io.druid.client.cache.CachePopulator;
 import io.druid.client.cache.CacheConfig;
+import io.druid.client.cache.CachePopulatorStats;
+import io.druid.client.cache.ForegroundCachePopulator;
 import io.druid.client.cache.MapCache;
 import io.druid.client.selector.QueryableDruidServer;
 import io.druid.client.selector.ServerSelector;
@@ -74,7 +75,9 @@ public void setUp()
     timeline = new VersionedIntervalTimeline<>(Ordering.natural());
     serverView = EasyMock.createNiceMock(TimelineServerView.class);
     cache = MapCache.create(100000);
-    client = makeClient(MoreExecutors.sameThreadExecutor());
+    client = makeClient(
+        new ForegroundCachePopulator(CachingClusteredClientTest.jsonMapper, new CachePopulatorStats(), -1)
+    );
   }
 
   @Test
@@ -199,13 +202,13 @@ public QueryableDruidServer pick(
     ));
   }
 
-  protected CachingClusteredClient makeClient(final ListeningExecutorService backgroundExecutorService)
+  protected CachingClusteredClient makeClient(final CachePopulator cachePopulator)
   {
-    return makeClient(backgroundExecutorService, cache, 10);
+    return makeClient(cachePopulator, cache, 10);
   }
 
   protected CachingClusteredClient makeClient(
-      final ListeningExecutorService backgroundExecutorService,
+      final CachePopulator cachePopulator,
       final Cache cache,
       final int mergeLimit
   )
@@ -245,7 +248,7 @@ public void registerServerRemovedCallback(Executor exec, ServerRemovedCallback c
         },
         cache,
         CachingClusteredClientTest.jsonMapper,
-        backgroundExecutorService,
+        cachePopulator,
         new CacheConfig()
         {
           @Override
diff --git a/server/src/test/java/io/druid/client/CachingClusteredClientTest.java b/server/src/test/java/io/druid/client/CachingClusteredClientTest.java
index 31f68ffbb64..6143dad7902 100644
--- a/server/src/test/java/io/druid/client/CachingClusteredClientTest.java
+++ b/server/src/test/java/io/druid/client/CachingClusteredClientTest.java
@@ -43,8 +43,12 @@
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.SettableFuture;
+import io.druid.client.cache.BackgroundCachePopulator;
 import io.druid.client.cache.Cache;
 import io.druid.client.cache.CacheConfig;
+import io.druid.client.cache.CachePopulator;
+import io.druid.client.cache.CachePopulatorStats;
+import io.druid.client.cache.ForegroundCachePopulator;
 import io.druid.client.cache.MapCache;
 import io.druid.client.selector.HighestPriorityTierSelectorStrategy;
 import io.druid.client.selector.QueryableDruidServer;
@@ -330,7 +334,7 @@ public void setUp()
     timeline = new VersionedIntervalTimeline<>(Ordering.natural());
     serverView = EasyMock.createNiceMock(TimelineServerView.class);
     cache = MapCache.create(100000);
-    client = makeClient(MoreExecutors.sameThreadExecutor());
+    client = makeClient(new ForegroundCachePopulator(jsonMapper, new CachePopulatorStats(), -1));
 
     servers = new DruidServer[]{
         new DruidServer("test1", "test1", null, 10, ServerType.HISTORICAL, "bye", 0),
@@ -422,7 +426,14 @@ public void onFailure(Throwable t)
       }
     };
 
-    client = makeClient(randomizingExecutorService);
+    client = makeClient(
+        new BackgroundCachePopulator(
+            randomizingExecutorService,
+            jsonMapper,
+            new CachePopulatorStats(),
+            -1
+        )
+    );
 
     // callback to be run every time a query run is complete, to ensure all background
     // caching tasks are executed, and cache is populated before we move onto the next query
@@ -579,7 +590,7 @@ public void testCachingOverBulkLimitEnforcesLimit()
             .andReturn(ImmutableMap.of())
             .once();
     EasyMock.replay(cache);
-    client = makeClient(MoreExecutors.sameThreadExecutor(), cache, limit);
+    client = makeClient(new ForegroundCachePopulator(jsonMapper, new CachePopulatorStats(), -1), cache, limit);
     final DruidServer lastServer = servers[random.nextInt(servers.length)];
     final DataSegment dataSegment = EasyMock.createNiceMock(DataSegment.class);
     EasyMock.expect(dataSegment.getIdentifier()).andReturn(DATA_SOURCE).anyTimes();
@@ -604,7 +615,7 @@ public void testCachingOverBulkLimitEnforcesLimit()
             .andReturn(ImmutableMap.of())
             .once();
     EasyMock.replay(cache);
-    client = makeClient(MoreExecutors.sameThreadExecutor(), cache, 0);
+    client = makeClient(new ForegroundCachePopulator(jsonMapper, new CachePopulatorStats(), -1), cache, 0);
     getDefaultQueryRunner().run(QueryPlus.wrap(query), context);
     EasyMock.verify(cache);
     EasyMock.verify(dataSegment);
@@ -2630,13 +2641,13 @@ private void runWithMocks(Runnable toRun, Object... mocks)
     EasyMock.reset(mocks);
   }
 
-  protected CachingClusteredClient makeClient(final ListeningExecutorService backgroundExecutorService)
+  protected CachingClusteredClient makeClient(final CachePopulator cachePopulator)
   {
-    return makeClient(backgroundExecutorService, cache, 10);
+    return makeClient(cachePopulator, cache, 10);
   }
 
   protected CachingClusteredClient makeClient(
-      final ListeningExecutorService backgroundExecutorService,
+      final CachePopulator cachePopulator,
       final Cache cache,
       final int mergeLimit
   )
@@ -2676,7 +2687,7 @@ public void registerServerRemovedCallback(Executor exec, ServerRemovedCallback c
         },
         cache,
         jsonMapper,
-        backgroundExecutorService,
+        cachePopulator,
         new CacheConfig()
         {
           @Override
diff --git a/server/src/test/java/io/druid/client/CachingQueryRunnerTest.java b/server/src/test/java/io/druid/client/CachingQueryRunnerTest.java
index ab2bf224836..012fe2b9be2 100644
--- a/server/src/test/java/io/druid/client/CachingQueryRunnerTest.java
+++ b/server/src/test/java/io/druid/client/CachingQueryRunnerTest.java
@@ -19,20 +19,26 @@
 
 package io.druid.client;
 
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Function;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.MoreExecutors;
+import io.druid.client.cache.BackgroundCachePopulator;
 import io.druid.client.cache.Cache;
 import io.druid.client.cache.CacheConfig;
+import io.druid.client.cache.CachePopulator;
+import io.druid.client.cache.CachePopulatorStats;
 import io.druid.client.cache.CacheStats;
+import io.druid.client.cache.ForegroundCachePopulator;
 import io.druid.client.cache.MapCache;
 import io.druid.jackson.DefaultObjectMapper;
 import io.druid.java.util.common.DateTimes;
 import io.druid.java.util.common.ISE;
 import io.druid.java.util.common.Intervals;
+import io.druid.java.util.common.concurrent.Execs;
 import io.druid.java.util.common.granularity.Granularities;
 import io.druid.java.util.common.guava.Sequence;
 import io.druid.java.util.common.guava.SequenceWrapper;
@@ -63,15 +69,15 @@
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
+import java.io.ByteArrayOutputStream;
 import java.io.Closeable;
+import java.io.IOException;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -98,14 +104,22 @@
       DateTimes.of("2011-01-09"), "a", 50, 4985, "b", 50, 4984, "c", 50, 4983
   };
 
-  private ExecutorService backgroundExecutorService;
+  private ObjectMapper objectMapper;
+  private CachePopulator cachePopulator;
 
   public CachingQueryRunnerTest(int numBackgroundThreads)
   {
+    objectMapper = new DefaultObjectMapper();
+
     if (numBackgroundThreads > 0) {
-      backgroundExecutorService = Executors.newFixedThreadPool(numBackgroundThreads);
+      cachePopulator = new BackgroundCachePopulator(
+          Execs.multiThreaded(numBackgroundThreads, "CachingQueryRunnerTest-%d"),
+          objectMapper,
+          new CachePopulatorStats(),
+          -1
+      );
     } else {
-      backgroundExecutorService = MoreExecutors.sameThreadExecutor();
+      cachePopulator = new ForegroundCachePopulator(objectMapper, new CachePopulatorStats(), -1);
     }
   }
 
@@ -274,7 +288,7 @@ public Sequence run(QueryPlus queryPlus, Map responseContext)
             return resultSeq;
           }
         },
-        backgroundExecutorService,
+        cachePopulator,
         new CacheConfig()
         {
           @Override
@@ -331,7 +345,7 @@ private void testUseCache(
       List<Result> expectedResults,
       Query query,
       QueryToolChest toolchest
-  )
+  ) throws IOException
   {
     DefaultObjectMapper objectMapper = new DefaultObjectMapper();
     String segmentIdentifier = "segment";
@@ -345,12 +359,7 @@ private void testUseCache(
     );
 
     Cache cache = MapCache.create(1024 * 1024);
-    CacheUtil.populate(
-        cache,
-        objectMapper,
-        cacheKey,
-        Iterables.transform(expectedResults, cacheStrategy.prepareForSegmentLevelCache())
-    );
+    cache.put(cacheKey, toByteArray(Iterables.transform(expectedResults, cacheStrategy.prepareForSegmentLevelCache())));
 
     CachingQueryRunner runner = new CachingQueryRunner(
         segmentIdentifier,
@@ -367,7 +376,7 @@ public Sequence run(QueryPlus queryPlus, Map responseContext)
             return Sequences.empty();
           }
         },
-        backgroundExecutorService,
+        cachePopulator,
         new CacheConfig()
         {
           @Override
@@ -434,6 +443,19 @@ public boolean isUseCache()
     return retVal;
   }
 
+  private <T> byte[] toByteArray(final Iterable<T> results) throws IOException
+  {
+    final ByteArrayOutputStream bytes = new ByteArrayOutputStream();
+
+    try (JsonGenerator gen = objectMapper.getFactory().createGenerator(bytes)) {
+      for (T result : results) {
+        gen.writeObject(result);
+      }
+    }
+
+    return bytes.toByteArray();
+  }
+
   private static class AssertingClosable implements Closeable
   {
 
diff --git a/server/src/test/java/io/druid/client/cache/CachePopulatorTest.java b/server/src/test/java/io/druid/client/cache/CachePopulatorTest.java
new file mode 100644
index 00000000000..b855e20288a
--- /dev/null
+++ b/server/src/test/java/io/druid/client/cache/CachePopulatorTest.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.client.cache;
+
+import com.fasterxml.jackson.databind.MappingIterator;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterators;
+import com.google.common.primitives.Ints;
+import io.druid.java.util.common.concurrent.Execs;
+import io.druid.java.util.common.guava.Sequences;
+import io.druid.java.util.common.jackson.JacksonUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
+
+public class CachePopulatorTest
+{
+  private final ExecutorService exec = Execs.multiThreaded(2, "cache-populator-test-%d");
+  private final ObjectMapper objectMapper = new ObjectMapper();
+  private final Cache cache = new MapCache(new ByteCountingLRUMap(Long.MAX_VALUE));
+  private final CachePopulatorStats stats = new CachePopulatorStats();
+
+  @After
+  public void tearDown()
+  {
+    exec.shutdownNow();
+  }
+
+  @Test
+  public void testForegroundPopulator()
+  {
+    final CachePopulator populator = new ForegroundCachePopulator(objectMapper, stats, -1);
+    final List<String> strings = ImmutableList.of("foo", "bar");
+
+    Assert.assertEquals(strings, wrapAndReturn(populator, makeKey(1), strings));
+    Assert.assertEquals(strings, readFromCache(makeKey(1)));
+    Assert.assertEquals(1, stats.snapshot().getNumOk());
+    Assert.assertEquals(0, stats.snapshot().getNumError());
+    Assert.assertEquals(0, stats.snapshot().getNumOversized());
+  }
+
+  @Test
+  public void testForegroundPopulatorMaxEntrySize()
+  {
+    final CachePopulator populator = new ForegroundCachePopulator(objectMapper, stats, 30);
+    final List<String> strings = ImmutableList.of("foo", "bar");
+    final List<String> strings2 = ImmutableList.of("foo", "baralararararararaarararararaa");
+
+    Assert.assertEquals(strings, wrapAndReturn(populator, makeKey(1), strings));
+    Assert.assertEquals(strings, readFromCache(makeKey(1)));
+    Assert.assertEquals(strings2, wrapAndReturn(populator, makeKey(2), strings2));
+    Assert.assertNull(readFromCache(makeKey(2)));
+
+    Assert.assertEquals(1, stats.snapshot().getNumOk());
+    Assert.assertEquals(0, stats.snapshot().getNumError());
+    Assert.assertEquals(1, stats.snapshot().getNumOversized());
+  }
+
+  @Test(timeout = 60000L)
+  public void testBackgroundPopulator() throws InterruptedException
+  {
+    final CachePopulator populator = new BackgroundCachePopulator(exec, objectMapper, stats, -1);
+    final List<String> strings = ImmutableList.of("foo", "bar");
+
+    Assert.assertEquals(strings, wrapAndReturn(populator, makeKey(1), strings));
+
+    // Wait for background updates to happen.
+    while (cache.getStats().getNumEntries() < 1) {
+      Thread.sleep(100);
+    }
+
+    Assert.assertEquals(strings, readFromCache(makeKey(1)));
+    Assert.assertEquals(1, stats.snapshot().getNumOk());
+    Assert.assertEquals(0, stats.snapshot().getNumError());
+    Assert.assertEquals(0, stats.snapshot().getNumOversized());
+  }
+
+  @Test(timeout = 60000L)
+  public void testBackgroundPopulatorMaxEntrySize() throws InterruptedException
+  {
+    final CachePopulator populator = new BackgroundCachePopulator(exec, objectMapper, stats, 30);
+    final List<String> strings = ImmutableList.of("foo", "bar");
+    final List<String> strings2 = ImmutableList.of("foo", "baralararararararaarararararaa");
+
+    Assert.assertEquals(strings, wrapAndReturn(populator, makeKey(1), strings));
+    Assert.assertEquals(strings2, wrapAndReturn(populator, makeKey(2), strings2));
+
+    // Wait for background updates to happen.
+    while (cache.getStats().getNumEntries() < 1 || stats.snapshot().getNumOversized() < 1) {
+      Thread.sleep(100);
+    }
+
+    Assert.assertEquals(strings, readFromCache(makeKey(1)));
+    Assert.assertNull(readFromCache(makeKey(2)));
+    Assert.assertEquals(1, stats.snapshot().getNumOk());
+    Assert.assertEquals(0, stats.snapshot().getNumError());
+    Assert.assertEquals(1, stats.snapshot().getNumOversized());
+  }
+
+  private static Cache.NamedKey makeKey(final int n)
+  {
+    return new Cache.NamedKey("test", Ints.toByteArray(n));
+  }
+
+  private List<String> wrapAndReturn(
+      final CachePopulator populator,
+      final Cache.NamedKey key,
+      final List<String> strings
+  )
+  {
+    return populator.wrap(Sequences.simple(strings), s -> ImmutableMap.of("s", s), cache, key).toList();
+  }
+
+  private List<String> readFromCache(final Cache.NamedKey key)
+  {
+    final byte[] bytes = cache.get(key);
+    if (bytes == null) {
+      return null;
+    }
+
+    try (
+        final MappingIterator<Map<String, String>> iterator = objectMapper.readValues(
+            objectMapper.getFactory().createParser(bytes),
+            JacksonUtils.TYPE_REFERENCE_MAP_STRING_STRING
+        )
+    ) {
+      final List<Map<String, String>> retVal = new ArrayList<>();
+      Iterators.addAll(retVal, iterator);
+
+      // Undo map-wrapping that was done in wrapAndReturn.
+      return retVal.stream().map(m -> m.get("s")).collect(Collectors.toList());
+    }
+    catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}
diff --git a/server/src/test/java/io/druid/segment/realtime/FireDepartmentTest.java b/server/src/test/java/io/druid/segment/realtime/FireDepartmentTest.java
index 7b503aa2348..f88f5bffc6c 100644
--- a/server/src/test/java/io/druid/segment/realtime/FireDepartmentTest.java
+++ b/server/src/test/java/io/druid/segment/realtime/FireDepartmentTest.java
@@ -22,6 +22,7 @@
 import com.fasterxml.jackson.databind.InjectableValues;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import io.druid.client.cache.CacheConfig;
+import io.druid.client.cache.CachePopulatorStats;
 import io.druid.client.cache.MapCache;
 import io.druid.data.input.impl.DimensionsSpec;
 import io.druid.data.input.impl.JSONParseSpec;
@@ -115,6 +116,7 @@ public void testSerde() throws Exception
                 TestHelper.getTestIndexIO(OffHeapMemorySegmentWriteOutMediumFactory.instance()),
                 MapCache.create(0),
                 NO_CACHE_CONFIG,
+                new CachePopulatorStats(),
                 TestHelper.makeJsonMapper()
 
             ),
diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTester.java b/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTester.java
index 3c675c69c17..c5522d9fbfe 100644
--- a/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTester.java
+++ b/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTester.java
@@ -22,6 +22,7 @@
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableMap;
 import io.druid.client.cache.CacheConfig;
+import io.druid.client.cache.CachePopulatorStats;
 import io.druid.client.cache.MapCache;
 import io.druid.data.input.impl.DimensionsSpec;
 import io.druid.data.input.impl.JSONParseSpec;
@@ -273,7 +274,8 @@ public void unannounceSegments(Iterable<DataSegment> segments)
         emitter,
         queryExecutor,
         MapCache.create(2048),
-        new CacheConfig()
+        new CacheConfig(),
+        new CachePopulatorStats()
     );
   }
 
diff --git a/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java b/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java
index a5ad8a4633e..ef964178fe8 100644
--- a/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java
+++ b/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java
@@ -26,6 +26,7 @@
 import com.google.common.collect.Maps;
 import com.google.common.io.Files;
 import com.google.common.util.concurrent.MoreExecutors;
+import io.druid.client.cache.CachePopulatorStats;
 import io.druid.client.cache.MapCache;
 import io.druid.data.input.Committer;
 import io.druid.data.input.InputRow;
@@ -223,6 +224,7 @@ public void setUp() throws Exception
         TestHelper.getTestIndexIO(segmentWriteOutMediumFactory),
         MapCache.create(0),
         FireDepartmentTest.NO_CACHE_CONFIG,
+        new CachePopulatorStats(),
         TestHelper.makeJsonMapper()
     );
 
diff --git a/server/src/test/java/io/druid/server/coordination/ServerManagerTest.java b/server/src/test/java/io/druid/server/coordination/ServerManagerTest.java
index 1287748e596..df4aebf0a6b 100644
--- a/server/src/test/java/io/druid/server/coordination/ServerManagerTest.java
+++ b/server/src/test/java/io/druid/server/coordination/ServerManagerTest.java
@@ -26,8 +26,9 @@
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.MoreExecutors;
 import io.druid.client.cache.CacheConfig;
+import io.druid.client.cache.CachePopulatorStats;
+import io.druid.client.cache.ForegroundCachePopulator;
 import io.druid.client.cache.LocalCacheProvider;
 import io.druid.jackson.DefaultObjectMapper;
 import io.druid.java.util.common.IAE;
@@ -153,7 +154,7 @@ public void cleanup(DataSegment segment)
         },
         new NoopServiceEmitter(),
         serverManagerExec,
-        MoreExecutors.sameThreadExecutor(),
+        new ForegroundCachePopulator(new DefaultObjectMapper(), new CachePopulatorStats(), -1),
         new DefaultObjectMapper(),
         new LocalCacheProvider().get(),
         new CacheConfig(),


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org