You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2018/07/25 02:28:47 UTC

[GitHub] drcrallen closed pull request #6014: Optionally refuse to consume new data until the prior chunk is being consumed

drcrallen closed pull request #6014: Optionally refuse to consume new data until the prior chunk is being consumed
URL: https://github.com/apache/incubator-druid/pull/6014
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/content/querying/query-context.md b/docs/content/querying/query-context.md
index d4e2be28f12..8aee6744c39 100644
--- a/docs/content/querying/query-context.md
+++ b/docs/content/querying/query-context.md
@@ -22,6 +22,7 @@ The query context is used for various query configuration parameters. The follow
 |chunkPeriod      | `P0D` (off)                            | At the broker node level, long interval queries (of any type) may be broken into shorter interval queries to parallelize merging more than normal. Broken up queries will use a larger share of cluster resources, but may be able to complete faster as a result. Use ISO 8601 periods. For example, if this property is set to `P1M` (one month), then a query covering a year would be broken into 12 smaller queries. The broker uses its query processing executor service to initiate processing for query chunks, so make sure "druid.processing.numThreads" is configured appropriately on the broker. [groupBy queries](groupbyquery.html) do not support chunkPeriod by default, although they do if using the legacy "v1" engine. |
 |serializeDateTimeAsLong| `false`       | If true, DateTime is serialized as long in the result returned by broker and the data transportation between broker and compute node|
 |serializeDateTimeAsLongInner| `false`  | If true, DateTime is serialized as long in the data transportation between broker and compute node|
+|enableBrokerBackpressure|`false`|If true, brokers will refuse to accept new http chunks from query nodes until it is ready to process more chunks. This can reduce heap memory pressure on brokers for large query results at the potential expense of query latency|
 
 In addition, some query types offer context parameters specific to that query type.
 
diff --git a/processing/src/main/java/io/druid/query/QueryContexts.java b/processing/src/main/java/io/druid/query/QueryContexts.java
index 4252ad4537d..a8083e01fa6 100644
--- a/processing/src/main/java/io/druid/query/QueryContexts.java
+++ b/processing/src/main/java/io/druid/query/QueryContexts.java
@@ -33,12 +33,14 @@
   public static final String MAX_SCATTER_GATHER_BYTES_KEY = "maxScatterGatherBytes";
   public static final String DEFAULT_TIMEOUT_KEY = "defaultTimeout";
   public static final String CHUNK_PERIOD_KEY = "chunkPeriod";
+  public static final String ENABLE_BROKER_BACKPRESSURE = "enableBrokerBackpressure";
 
   public static final boolean DEFAULT_BY_SEGMENT = false;
   public static final boolean DEFAULT_POPULATE_CACHE = true;
   public static final boolean DEFAULT_USE_CACHE = true;
   public static final boolean DEFAULT_POPULATE_RESULTLEVEL_CACHE = true;
   public static final boolean DEFAULT_USE_RESULTLEVEL_CACHE = true;
+  public static final boolean DEFAULT_ENABLE_BROKER_BACKPRESSURE = false;
   public static final int DEFAULT_PRIORITY = 0;
   public static final int DEFAULT_UNCOVERED_INTERVALS_LIMIT = 0;
   public static final long DEFAULT_TIMEOUT_MILLIS = 300_000; // 5 minutes
@@ -170,7 +172,6 @@
   }
 
 
-
   public static <T> long getMaxScatterGatherBytes(Query<T> query)
   {
     return parseLong(query, MAX_SCATTER_GATHER_BYTES_KEY, Long.MAX_VALUE);
@@ -210,6 +211,11 @@
     return defaultTimeout;
   }
 
+  public static <T> boolean isEnableBrokerBackpressure(Query<T> query)
+  {
+    return parseBoolean(query, ENABLE_BROKER_BACKPRESSURE, DEFAULT_ENABLE_BROKER_BACKPRESSURE);
+  }
+
   static <T> long parseLong(Query<T> query, String key, long defaultValue)
   {
     final Object val = query.getContextValue(key);
diff --git a/server/src/main/java/io/druid/client/DirectDruidClient.java b/server/src/main/java/io/druid/client/DirectDruidClient.java
index 72607f91bf4..f64a9b1555e 100644
--- a/server/src/main/java/io/druid/client/DirectDruidClient.java
+++ b/server/src/main/java/io/druid/client/DirectDruidClient.java
@@ -80,18 +80,19 @@
 import java.util.Enumeration;
 import java.util.Iterator;
 import java.util.Map;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.LinkedTransferQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.TransferQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.LongAdder;
 
 /**
  */
@@ -164,19 +165,27 @@ public int getNumOpenConnections()
   public Sequence<T> run(final QueryPlus<T> queryPlus, final Map<String, Object> context)
   {
     final Query<T> query = queryPlus.getQuery();
-    QueryToolChest<T, Query<T>> toolChest = warehouse.getToolChest(query);
-    boolean isBySegment = QueryContexts.isBySegment(query);
-
-    Pair<JavaType, JavaType> types = typesMap.get(query.getClass());
-    if (types == null) {
-      final TypeFactory typeFactory = objectMapper.getTypeFactory();
-      JavaType baseType = typeFactory.constructType(toolChest.getResultTypeReference());
-      JavaType bySegmentType = typeFactory.constructParametricType(
-          Result.class, typeFactory.constructParametricType(BySegmentResultValueClass.class, baseType)
-      );
-      types = Pair.of(baseType, bySegmentType);
-      typesMap.put(query.getClass(), types);
-    }
+    final QueryToolChest<T, Query<T>> toolChest = warehouse.getToolChest(query);
+    final boolean isBySegment = QueryContexts.isBySegment(query);
+    final boolean isEnableBrokerBackpressure = QueryContexts.isEnableBrokerBackpressure(query);
+
+    final Pair<JavaType, JavaType> types = typesMap.computeIfAbsent(
+        query.getClass(),
+        ignored -> {
+          final TypeFactory typeFactory = objectMapper.getTypeFactory();
+          final JavaType baseType = typeFactory.constructType(toolChest.getResultTypeReference());
+          final JavaType bySegmentType = typeFactory.constructParametrizedType(
+              Result.class,
+              Result.class,
+              typeFactory.constructParametrizedType(
+                  BySegmentResultValueClass.class,
+                  BySegmentResultValueClass.class,
+                  baseType
+              )
+          );
+          return Pair.of(baseType, bySegmentType);
+        }
+    );
 
     final JavaType typeRef;
     if (isBySegment) {
@@ -200,8 +209,8 @@ public int getNumOpenConnections()
 
       final HttpResponseHandler<InputStream, InputStream> responseHandler = new HttpResponseHandler<InputStream, InputStream>()
       {
-        private final AtomicLong byteCount = new AtomicLong(0);
-        private final BlockingQueue<InputStream> queue = new LinkedBlockingQueue<>();
+        private final LongAdder byteCount = new LongAdder();
+        private final TransferQueue<InputStream> queue = new LinkedTransferQueue<>();
         private final AtomicBoolean done = new AtomicBoolean(false);
         private final AtomicReference<String> fail = new AtomicReference<>();
 
@@ -237,11 +246,16 @@ public int getNumOpenConnections()
                   )
               );
             }
-            queue.put(new ChannelBufferInputStream(response.getContent()));
+            final InputStream inputStream = new ChannelBufferInputStream(response.getContent());
+            if (isEnableBrokerBackpressure) {
+              queue.transfer(inputStream);
+            } else {
+              queue.put(inputStream);
+            }
           }
           catch (final IOException e) {
             log.error(e, "Error parsing response context from url [%s]", url);
-            return ClientResponse.<InputStream>finished(
+            return ClientResponse.finished(
                 new InputStream()
                 {
                   @Override
@@ -257,8 +271,8 @@ public int read() throws IOException
             Thread.currentThread().interrupt();
             throw Throwables.propagate(e);
           }
-          byteCount.addAndGet(response.getContent().readableBytes());
-          return ClientResponse.<InputStream>finished(
+          byteCount.add(response.getContent().readableBytes());
+          return ClientResponse.finished(
               new SequenceInputStream(
                   new Enumeration<InputStream>()
                   {
@@ -316,14 +330,19 @@ public InputStream nextElement()
 
           if (bytes > 0) {
             try {
-              queue.put(new ChannelBufferInputStream(channelBuffer));
+              final InputStream inputStream = new ChannelBufferInputStream(channelBuffer);
+              if (isEnableBrokerBackpressure) {
+                queue.transfer(inputStream);
+              } else {
+                queue.put(inputStream);
+              }
             }
             catch (InterruptedException e) {
               log.error(e, "Unable to put finalizing input stream into Sequence queue for url [%s]", url);
               Thread.currentThread().interrupt();
               throw Throwables.propagate(e);
             }
-            byteCount.addAndGet(bytes);
+            byteCount.add(bytes);
           }
           return clientResponse;
         }
@@ -331,25 +350,27 @@ public InputStream nextElement()
         @Override
         public ClientResponse<InputStream> done(ClientResponse<InputStream> clientResponse)
         {
-          long stopTimeNs = System.nanoTime();
-          long nodeTimeNs = stopTimeNs - requestStartTimeNs;
+          final long stopTimeNs = System.nanoTime();
+          final long nodeTimeNs = stopTimeNs - requestStartTimeNs;
           final long nodeTimeMs = TimeUnit.NANOSECONDS.toMillis(nodeTimeNs);
+          final long byteCount = this.byteCount.sum();
           log.debug(
               "Completed queryId[%s] request to url[%s] with %,d bytes returned in %,d millis [%,f b/s].",
               query.getId(),
               url,
-              byteCount.get(),
+              byteCount,
               nodeTimeMs,
-              byteCount.get() / (0.001 * nodeTimeMs) // Floating math; division by zero will yield Inf, not exception
+              byteCount / (0.001 * nodeTimeMs) // Floating math; division by zero will yield Inf, not exception
           );
-          QueryMetrics<? super Query<T>> responseMetrics = acquireResponseMetrics();
+          final QueryMetrics<? super Query<T>> responseMetrics = acquireResponseMetrics();
           responseMetrics.reportNodeTime(nodeTimeNs);
-          responseMetrics.reportNodeBytes(byteCount.get());
+          responseMetrics.reportNodeBytes(byteCount);
           responseMetrics.emit(emitter);
           synchronized (done) {
             try {
               // An empty byte array is put at the end to give the SequenceInputStream.close() as something to close out
               // after done is set to true, regardless of the rest of the stream's state.
+              // We don't need to "transfer" this one because there's no more results to force backpressure with
               queue.put(ByteSource.empty().openStream());
             }
             catch (InterruptedException e) {
@@ -365,7 +386,7 @@ public InputStream nextElement()
               done.set(true);
             }
           }
-          return ClientResponse.<InputStream>finished(clientResponse.getObj());
+          return ClientResponse.finished(clientResponse.getObj());
         }
 
         @Override
@@ -384,6 +405,7 @@ private void setupResponseReadFailure(String msg, Throwable th)
         {
           fail.set(msg);
           queue.clear();
+          // Don't need to transfer because this is a terminal state, so no need to block more items coming in
           queue.offer(new InputStream()
           {
             @Override


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org