You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2021/11/20 05:27:34 UTC

[GitHub] [druid] jihoonson commented on a change in pull request #11828: Refactor ResponseContext

jihoonson commented on a change in pull request #11828:
URL: https://github.com/apache/druid/pull/11828#discussion_r753631275



##########
File path: processing/src/main/java/org/apache/druid/query/context/ResponseContextDeserializer.java
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.context;
+
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+
+import java.io.IOException;
+
+@SuppressWarnings("serial")
+public class ResponseContextDeserializer extends StdDeserializer<ResponseContext>

Review comment:
       I don't see any unit test for this deserializer. Can you please add some?

##########
File path: processing/src/main/java/org/apache/druid/query/context/ResponseContext.java
##########
@@ -30,93 +33,334 @@
 import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.NonnullPair;
-import org.apache.druid.java.util.common.jackson.JacksonUtils;
 import org.apache.druid.query.SegmentDescriptor;
+import org.apache.druid.utils.CollectionUtils;
 import org.joda.time.Interval;
 
 import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.function.BiFunction;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 
 /**
  * The context for storing and passing data between chains of {@link org.apache.druid.query.QueryRunner}s.
  * The context is also transferred between Druid nodes with all the data it contains.
- */
+ * <p>
+ * The response context consists of a set of key/value pairs. Keys are those defined in
+ * the {@code Keys} registry. Keys are indexed by key instance, not by name. The
+ * key defines the type of the associated value, including logic to merge values and
+ * to deserialize JSON values for that key.
+ *
+ * <h4>Structure</h4>
+ * The context has evolved to perform multiple tasks. First, it holds two kinds
+ * of information:
+ * <ul>
+ * <li>Information to be returned in the query response header.
+ * (These are values tagged as {@code HEADER}.)</li>
+ * <li>Values passed within a single server. These are tagged with
+ * visibility {@code NONE}.)</li>
+ * </ul>
+ * Second, it performs multiple tasks:
+ * <ul>
+ * <li>Registers the keys to be used in the header. But, since it also holds
+ * internal information, the internal information also needs keys, though the
+ * corresponding values are never serialized.</li>
+ * <li>Gathers information for the query as a whole.</li>
+ * <li>Merges information back up the query tree: from multiple segments,
+ * from multiple servers, etc.</li>
+ * <li>Manages headers size by dropping fields when the header would get too
+ * large.</li>
+ * </ul>
+ *
+ * A result is that the information the context, when inspected by a calling
+ * query, may be incomplete if some of it was previously dropped by the
+ * called query.
+ *
+ * <h4>API</h4>
+ *
+ * The query profile needs to obtain the full, untruncated information. To do this
+ * it piggy-backs on the set operations to obtain the full value. To ensure this
+ * is possible, code that works with standard values should call the set (or add)
+ * functions provided which will do the needed map update.
+  */
 @PublicApi
 public abstract class ResponseContext
 {
   /**
    * The base interface of a response context key.
    * Should be implemented by every context key.
    */
-  public interface BaseKey
+  public interface Key
   {
+    /**
+     * Hack, pure and simple. The symbol "ResponseContext.Key.ETAG" must exist
+     * to avoid changing a line of code where we have no tests, which causes
+     * the build to fail. Remove this once the proper tests are added.
+     *
+     * @see {@link org.apache.druid.server.QueryResource#doPost}
+     */
+    public static final Key ETAG = Keys.ETAG;
+
     @JsonValue
     String getName();
+
+    /**
+     * The phase (header, trailer, none) where this key is emitted.
+     */
+    Visibility getPhase();
+
     /**
-     * Merge function associated with a key: Object (Object oldValue, Object newValue)
+     * Reads a value of this key from a JSON stream. Used by {@link ResponseContextDeserializer}.
+     */
+    Object readValue(JsonParser jp);
+
+    /**
+     * Merges two values of type T.
+     *
+     * This method may modify "oldValue" but must not modify "newValue".
      */
-    BiFunction<Object, Object, Object> getMergeFunction();
+    Object mergeValues(Object oldValue, Object newValue);
+
+    /**
+     * Returns true if this key can be removed to reduce header size when the
+     * header would otherwise be too large.
+     */
+    @JsonIgnore
+    boolean canDrop();
+  }
+
+  /**
+   * Where the key is emitted, if at all. Values in the context can be for internal
+   * use only: for return before the query results (header) or only after query
+   * results (trailer).
+   */
+  public enum Visibility
+  {
+    /**
+     * Keys that are present in both the "X-Druid-Response-Context" header *and* the response context trailer.
+     */
+    HEADER,
+
+    /**
+     * Keys that are not present in query responses at all. Generally used for internal state tracking within a
+     * single server.
+     */
+    NONE
+  }
+
+  /**
+   * Abstract key class which provides most functionality except the
+   * type-specific merge logic. Parsing is provided by an associated
+   * parse function.
+   */
+  public abstract static class AbstractKey implements Key
+  {
+    private final String name;
+    private final Visibility visibility;
+    private final boolean canDrop;
+    private final Function<JsonParser, Object> parseFunction;
+
+    AbstractKey(String name, Visibility visibility, boolean canDrop, Class<?> serializedClass)
+    {
+      this.name = name;
+      this.visibility = visibility;
+      this.canDrop = canDrop;
+      this.parseFunction = jp -> {
+        try {
+          return jp.readValueAs(serializedClass);
+        }
+        catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+      };
+    }
+
+    AbstractKey(String name, Visibility visibility, boolean canDrop, TypeReference<?> serializedTypeReference)
+    {
+      this.name = name;
+      this.visibility = visibility;
+      this.canDrop = canDrop;
+      this.parseFunction = jp -> {
+        try {
+          return jp.readValueAs(serializedTypeReference);
+        }
+        catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+      };
+    }
+
+    @Override
+    public String getName()
+    {
+      return name;
+    }
+
+    @Override
+    public Visibility getPhase()
+    {
+      return visibility;
+    }
+
+    @Override
+    public boolean canDrop()
+    {
+      return canDrop;
+    }
+
+    @Override
+    public Object readValue(JsonParser jp)
+    {
+      return parseFunction.apply(jp);
+    }
+
+    @Override
+    public String toString()
+    {
+      return name;
+    }
+  }
+
+  /**
+   * String valued attribute that holds the latest value assigned.
+   */
+  public static class StringKey extends AbstractKey
+  {
+    StringKey(String name, Visibility visibility, boolean canDrop)
+    {
+      super(name, visibility, canDrop, String.class);
+    }
+
+    @Override
+    public Object mergeValues(Object oldValue, Object newValue)
+    {
+      return newValue;
+    }
+  }
+
+  /**
+   * Boolean valued attribute with the semantics that once the flag is
+   * set true, it stays true.
+   */
+  public static class BooleanKey extends AbstractKey
+  {
+    BooleanKey(String name, Visibility visibility)
+    {
+      super(name, visibility, false, Boolean.class);
+    }
+
+    @Override
+    public Object mergeValues(Object oldValue, Object newValue)
+    {
+      return (boolean) oldValue || (boolean) newValue;
+    }
+  }
+
+  /**
+   * Long valued attribute that holds the latest value assigned.
+   */
+  public static class LongKey extends AbstractKey
+  {
+    LongKey(String name, Visibility visibility)
+    {
+      super(name, visibility, false, Long.class);
+    }
+
+    @Override
+    public Object mergeValues(Object oldValue, Object newValue)
+    {
+      return newValue;
+    }
   }
 
   /**
-   * Keys associated with objects in the context.
+   * Long valued attribute that holds the accumulation of values assigned.
+   */
+  public static class CounterKey extends AbstractKey
+  {
+    CounterKey(String name, Visibility visibility)
+    {
+      super(name, visibility, false, Long.class);
+    }
+
+    @Override
+    public Object mergeValues(Object oldValue, Object newValue)
+    {
+      if (oldValue == null) {
+        return newValue;
+      }
+      if (newValue == null) {
+        return oldValue;
+      }
+      return (Long) oldValue + (Long) newValue;
+    }
+  }
+
+  /**
+   * Global registry of response context keys.
+   * <p>
+   * Also defines the standard keys associated with objects in the context.
    * <p>
    * If it's necessary to have some new keys in the context then they might be listed in a separate enum:

Review comment:
       ```suggestion
      * If it's necessary to have some new keys in the context then they might be listed in a separate class:
   ```

##########
File path: server/src/main/java/org/apache/druid/query/RetryQueryRunner.java
##########
@@ -157,7 +156,11 @@ public void cleanup(RetryingSequenceIterator iterFromMake)
       throw new ISE("Failed to check missing segments due to missing responses from [%d] servers", remainingResponses);
     }
 
-    final Object maybeMissingSegments = context.get(ResponseContext.Key.MISSING_SEGMENTS);
+    // TODO: the sender's response may contain a truncated list of missing segments.
+    // Truncation is aggregated in the response context given as a parameter.
+    // Check the getTruncated() value: if true, then the we don't know the full set of
+    // missing segments.

Review comment:
       Good catch. Can you please file a bug issue on GitHub, so that people can be aware of this bug?

##########
File path: processing/src/main/java/org/apache/druid/query/context/ResponseContextDeserializer.java
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.context;
+
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+
+import java.io.IOException;
+
+@SuppressWarnings("serial")
+public class ResponseContextDeserializer extends StdDeserializer<ResponseContext>
+{
+  public ResponseContextDeserializer()
+  {
+    super(ResponseContext.class);
+  }
+
+  @Override
+  public ResponseContext deserialize(
+      final JsonParser jp,
+      final DeserializationContext ctxt
+  ) throws IOException
+  {
+    if (jp.currentToken() != JsonToken.START_OBJECT) {
+      throw ctxt.wrongTokenException(jp, ResponseContext.class, JsonToken.START_OBJECT, null);
+    }
+
+    // The response context is created for single-thread use. (That is,
+    // it is non-concurrent.) Clients of this code should convert the
+    // context to concurrent if it will be used across threads.

Review comment:
       This seems worth putting in Javadoc of either this class or this method.

##########
File path: server/src/main/java/org/apache/druid/server/QueryResource.java
##########
@@ -214,6 +216,9 @@ public Response doPost(
       final Sequence<?> results = queryResponse.getResults();
       final ResponseContext responseContext = queryResponse.getResponseContext();
       final String prevEtag = getPreviousEtag(req);
+      // In the following line, responseContext.get(ResponseContext.Key.ETAG) should be
+      // replaced with responseContext.getEntityTag(). But, we have no tests for one of the
+      // code paths, so a build check fails, meaning that the change can't be made at this time.

Review comment:
       Does this cause any actual change in the logic? If not, I would say, let's ignore the bot and fix it in this PR. 

##########
File path: processing/src/main/java/org/apache/druid/query/context/ResponseContext.java
##########
@@ -30,93 +33,334 @@
 import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.NonnullPair;
-import org.apache.druid.java.util.common.jackson.JacksonUtils;
 import org.apache.druid.query.SegmentDescriptor;
+import org.apache.druid.utils.CollectionUtils;
 import org.joda.time.Interval;
 
 import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.function.BiFunction;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 
 /**
  * The context for storing and passing data between chains of {@link org.apache.druid.query.QueryRunner}s.
  * The context is also transferred between Druid nodes with all the data it contains.
- */
+ * <p>
+ * The response context consists of a set of key/value pairs. Keys are those defined in
+ * the {@code Keys} registry. Keys are indexed by key instance, not by name. The
+ * key defines the type of the associated value, including logic to merge values and
+ * to deserialize JSON values for that key.
+ *
+ * <h4>Structure</h4>
+ * The context has evolved to perform multiple tasks. First, it holds two kinds
+ * of information:
+ * <ul>
+ * <li>Information to be returned in the query response header.
+ * (These are values tagged as {@code HEADER}.)</li>
+ * <li>Values passed within a single server. These are tagged with
+ * visibility {@code NONE}.)</li>

Review comment:
       ```suggestion
    * {@link Visibility#NONE}.)</li>
   ```

##########
File path: processing/src/main/java/org/apache/druid/query/context/ResponseContext.java
##########
@@ -30,93 +33,334 @@
 import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.NonnullPair;
-import org.apache.druid.java.util.common.jackson.JacksonUtils;
 import org.apache.druid.query.SegmentDescriptor;
+import org.apache.druid.utils.CollectionUtils;
 import org.joda.time.Interval;
 
 import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.function.BiFunction;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 
 /**
  * The context for storing and passing data between chains of {@link org.apache.druid.query.QueryRunner}s.
  * The context is also transferred between Druid nodes with all the data it contains.
- */
+ * <p>
+ * The response context consists of a set of key/value pairs. Keys are those defined in
+ * the {@code Keys} registry. Keys are indexed by key instance, not by name. The
+ * key defines the type of the associated value, including logic to merge values and
+ * to deserialize JSON values for that key.
+ *
+ * <h4>Structure</h4>
+ * The context has evolved to perform multiple tasks. First, it holds two kinds
+ * of information:
+ * <ul>
+ * <li>Information to be returned in the query response header.
+ * (These are values tagged as {@code HEADER}.)</li>

Review comment:
       ```suggestion
    * (These are values tagged as {@link Visibility#HEADER}.)</li>
   ```

##########
File path: processing/src/main/java/org/apache/druid/query/context/ResponseContext.java
##########
@@ -30,93 +33,334 @@
 import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.NonnullPair;
-import org.apache.druid.java.util.common.jackson.JacksonUtils;
 import org.apache.druid.query.SegmentDescriptor;
+import org.apache.druid.utils.CollectionUtils;
 import org.joda.time.Interval;
 
 import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.function.BiFunction;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 
 /**
  * The context for storing and passing data between chains of {@link org.apache.druid.query.QueryRunner}s.
  * The context is also transferred between Druid nodes with all the data it contains.
- */
+ * <p>
+ * The response context consists of a set of key/value pairs. Keys are those defined in
+ * the {@code Keys} registry. Keys are indexed by key instance, not by name. The
+ * key defines the type of the associated value, including logic to merge values and
+ * to deserialize JSON values for that key.
+ *
+ * <h4>Structure</h4>
+ * The context has evolved to perform multiple tasks. First, it holds two kinds
+ * of information:
+ * <ul>
+ * <li>Information to be returned in the query response header.
+ * (These are values tagged as {@code HEADER}.)</li>
+ * <li>Values passed within a single server. These are tagged with
+ * visibility {@code NONE}.)</li>
+ * </ul>
+ * Second, it performs multiple tasks:
+ * <ul>
+ * <li>Registers the keys to be used in the header. But, since it also holds
+ * internal information, the internal information also needs keys, though the
+ * corresponding values are never serialized.</li>
+ * <li>Gathers information for the query as a whole.</li>
+ * <li>Merges information back up the query tree: from multiple segments,
+ * from multiple servers, etc.</li>
+ * <li>Manages headers size by dropping fields when the header would get too
+ * large.</li>
+ * </ul>
+ *
+ * A result is that the information the context, when inspected by a calling
+ * query, may be incomplete if some of it was previously dropped by the
+ * called query.
+ *
+ * <h4>API</h4>
+ *
+ * The query profile needs to obtain the full, untruncated information. To do this
+ * it piggy-backs on the set operations to obtain the full value. To ensure this
+ * is possible, code that works with standard values should call the set (or add)
+ * functions provided which will do the needed map update.
+  */
 @PublicApi
 public abstract class ResponseContext
 {
   /**
    * The base interface of a response context key.
    * Should be implemented by every context key.
    */
-  public interface BaseKey
+  public interface Key
   {
+    /**
+     * Hack, pure and simple. The symbol "ResponseContext.Key.ETAG" must exist
+     * to avoid changing a line of code where we have no tests, which causes
+     * the build to fail. Remove this once the proper tests are added.
+     *
+     * @see {@link org.apache.druid.server.QueryResource#doPost}
+     */
+    public static final Key ETAG = Keys.ETAG;
+
     @JsonValue
     String getName();
+
+    /**
+     * The phase (header, trailer, none) where this key is emitted.
+     */
+    Visibility getPhase();
+
     /**
-     * Merge function associated with a key: Object (Object oldValue, Object newValue)
+     * Reads a value of this key from a JSON stream. Used by {@link ResponseContextDeserializer}.
+     */
+    Object readValue(JsonParser jp);
+
+    /**
+     * Merges two values of type T.
+     *
+     * This method may modify "oldValue" but must not modify "newValue".
      */
-    BiFunction<Object, Object, Object> getMergeFunction();
+    Object mergeValues(Object oldValue, Object newValue);
+
+    /**
+     * Returns true if this key can be removed to reduce header size when the
+     * header would otherwise be too large.
+     */
+    @JsonIgnore
+    boolean canDrop();
+  }
+
+  /**
+   * Where the key is emitted, if at all. Values in the context can be for internal
+   * use only: for return before the query results (header) or only after query
+   * results (trailer).
+   */
+  public enum Visibility
+  {
+    /**
+     * Keys that are present in both the "X-Druid-Response-Context" header *and* the response context trailer.
+     */
+    HEADER,
+
+    /**
+     * Keys that are not present in query responses at all. Generally used for internal state tracking within a
+     * single server.
+     */
+    NONE
+  }
+
+  /**
+   * Abstract key class which provides most functionality except the
+   * type-specific merge logic. Parsing is provided by an associated
+   * parse function.
+   */
+  public abstract static class AbstractKey implements Key
+  {
+    private final String name;
+    private final Visibility visibility;
+    private final boolean canDrop;
+    private final Function<JsonParser, Object> parseFunction;
+
+    AbstractKey(String name, Visibility visibility, boolean canDrop, Class<?> serializedClass)
+    {
+      this.name = name;
+      this.visibility = visibility;
+      this.canDrop = canDrop;
+      this.parseFunction = jp -> {
+        try {
+          return jp.readValueAs(serializedClass);
+        }
+        catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+      };
+    }
+
+    AbstractKey(String name, Visibility visibility, boolean canDrop, TypeReference<?> serializedTypeReference)
+    {
+      this.name = name;
+      this.visibility = visibility;
+      this.canDrop = canDrop;
+      this.parseFunction = jp -> {
+        try {
+          return jp.readValueAs(serializedTypeReference);
+        }
+        catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+      };
+    }
+
+    @Override
+    public String getName()
+    {
+      return name;
+    }
+
+    @Override
+    public Visibility getPhase()
+    {
+      return visibility;
+    }
+
+    @Override
+    public boolean canDrop()
+    {
+      return canDrop;
+    }
+
+    @Override
+    public Object readValue(JsonParser jp)
+    {
+      return parseFunction.apply(jp);
+    }
+
+    @Override
+    public String toString()
+    {
+      return name;
+    }
+  }
+
+  /**
+   * String valued attribute that holds the latest value assigned.
+   */
+  public static class StringKey extends AbstractKey
+  {
+    StringKey(String name, Visibility visibility, boolean canDrop)
+    {
+      super(name, visibility, canDrop, String.class);
+    }
+
+    @Override
+    public Object mergeValues(Object oldValue, Object newValue)
+    {
+      return newValue;
+    }
+  }
+
+  /**
+   * Boolean valued attribute with the semantics that once the flag is
+   * set true, it stays true.
+   */
+  public static class BooleanKey extends AbstractKey
+  {
+    BooleanKey(String name, Visibility visibility)
+    {
+      super(name, visibility, false, Boolean.class);
+    }
+
+    @Override
+    public Object mergeValues(Object oldValue, Object newValue)
+    {
+      return (boolean) oldValue || (boolean) newValue;
+    }
+  }
+
+  /**
+   * Long valued attribute that holds the latest value assigned.
+   */
+  public static class LongKey extends AbstractKey
+  {
+    LongKey(String name, Visibility visibility)
+    {
+      super(name, visibility, false, Long.class);
+    }
+
+    @Override
+    public Object mergeValues(Object oldValue, Object newValue)
+    {
+      return newValue;
+    }
   }
 
   /**
-   * Keys associated with objects in the context.
+   * Long valued attribute that holds the accumulation of values assigned.
+   */
+  public static class CounterKey extends AbstractKey
+  {
+    CounterKey(String name, Visibility visibility)
+    {
+      super(name, visibility, false, Long.class);
+    }
+
+    @Override
+    public Object mergeValues(Object oldValue, Object newValue)
+    {
+      if (oldValue == null) {
+        return newValue;
+      }
+      if (newValue == null) {
+        return oldValue;
+      }
+      return (Long) oldValue + (Long) newValue;
+    }
+  }
+
+  /**
+   * Global registry of response context keys.
+   * <p>
+   * Also defines the standard keys associated with objects in the context.
    * <p>
    * If it's necessary to have some new keys in the context then they might be listed in a separate enum:
    * <pre>{@code
-   * public enum ExtensionResponseContextKey implements BaseKey
+   * public class SomeClass
    * {
-   *   EXTENSION_KEY_1("extension_key_1"), EXTENSION_KEY_2("extension_key_2");
+   *   static final Key EXTENSION_KEY_1 = new StringKey(
+   *      "extension_key_1", Visibility.HEADER_AND_TRAILER, true),
+   *   static final Key EXTENSION_KEY_2 = new CounterKey(
+   *      "extension_key_2", Visibility.None);
    *
    *   static {
-   *     for (BaseKey key : values()) ResponseContext.Key.registerKey(key);
+   *     Keys.instance().registerKeys(new Key[] {
+   *        EXTENSION_KEY_1,
+   *        EXTENSION_KEY_2
+   *     });
    *   }
-   *
-   *   private final String name;
-   *   private final BiFunction<Object, Object, Object> mergeFunction;
-   *
-   *   ExtensionResponseContextKey(String name)
-   *   {
-   *     this.name = name;
-   *     this.mergeFunction = (oldValue, newValue) -> newValue;
-   *   }
-   *
-   *   @Override public String getName() { return name; }
-   *
-   *   @Override public BiFunction<Object, Object, Object> getMergeFunction() { return mergeFunction; }
-   * }
    * }</pre>
-   * Make sure all extension enum values added with {@link Key#registerKey} method.
+   * Make sure all extension keys are added with the {@link #registerKey(Key)} or
+   * {@link #registerKeys(Key[])} methods.
+   * <p>
+   * Predefined key types exist for common values. Custom values can be created as
+   * shown below.

Review comment:
       Did you mean to put this sentence somewhere above?

##########
File path: processing/src/main/java/org/apache/druid/query/context/ResponseContext.java
##########
@@ -129,214 +373,373 @@
      *
      * @see org.apache.druid.query.Query#getMostSpecificId
      */
-    REMAINING_RESPONSES_FROM_QUERY_SERVERS(
+    public static final Key REMAINING_RESPONSES_FROM_QUERY_SERVERS = new AbstractKey(
         "remainingResponsesFromQueryServers",
-            (totalRemainingPerId, idAndNumResponses) -> {
-              final ConcurrentHashMap<String, Integer> map = (ConcurrentHashMap<String, Integer>) totalRemainingPerId;
-              final NonnullPair<String, Integer> pair = (NonnullPair<String, Integer>) idAndNumResponses;
-              map.compute(
-                  pair.lhs,
-                  (id, remaining) -> remaining == null ? pair.rhs : remaining + pair.rhs
-              );
-              return map;
-            }
-    ),
+        Visibility.NONE, true,
+        Object.class)
+    {
+      @Override
+      @SuppressWarnings("unchecked")
+      public Object mergeValues(Object totalRemainingPerId, Object idAndNumResponses)
+      {
+        final ConcurrentHashMap<String, Integer> map = (ConcurrentHashMap<String, Integer>) totalRemainingPerId;
+        final NonnullPair<String, Integer> pair = (NonnullPair<String, Integer>) idAndNumResponses;
+        map.compute(
+            pair.lhs,
+            (id, remaining) -> remaining == null ? pair.rhs : remaining + pair.rhs);
+        return map;
+      }
+    };
+
     /**
      * Lists missing segments.
      */
-    MISSING_SEGMENTS(
+    public static final Key MISSING_SEGMENTS = new AbstractKey(
         "missingSegments",
-            (oldValue, newValue) -> {
-              final ArrayList<SegmentDescriptor> result = new ArrayList<SegmentDescriptor>((List) oldValue);
-              result.addAll((List) newValue);
-              return result;
-            }
-    ),
+        Visibility.HEADER, true,
+        new TypeReference<List<SegmentDescriptor>>() {})
+    {
+      @Override
+      @SuppressWarnings("unchecked")
+      public Object mergeValues(Object oldValue, Object newValue)
+      {
+        final List<SegmentDescriptor> result = new ArrayList<SegmentDescriptor>((List<SegmentDescriptor>) oldValue);
+        result.addAll((List<SegmentDescriptor>) newValue);
+        return result;
+      }
+    };
+
     /**
      * Entity tag. A part of HTTP cache validation mechanism.
      * Is being removed from the context before sending and used as a separate HTTP header.
      */
-    ETAG("ETag"),
+    public static final Key ETAG = new StringKey("ETag", Visibility.NONE, true);
+
     /**
-     * Query fail time (current time + timeout).
-     * It is not updated continuously as {@link Key#TIMEOUT_AT}.
+     * Query total bytes gathered.
      */
-    QUERY_FAIL_DEADLINE_MILLIS("queryFailTime"),
+    public static final Key QUERY_TOTAL_BYTES_GATHERED = new AbstractKey(
+        "queryTotalBytesGathered",
+        Visibility.NONE, false,
+        new TypeReference<AtomicLong>() {})
+    {
+      @Override
+      public Object mergeValues(Object oldValue, Object newValue)
+      {
+        return ((AtomicLong) newValue).addAndGet(((AtomicLong) newValue).get());
+      }
+    };
+
     /**
-     * Query total bytes gathered.
+     * Query fail time (current time + timeout).
+     * It is not updated continuously as {@link Keys#TIMEOUT_AT}.
      */
-    QUERY_TOTAL_BYTES_GATHERED("queryTotalBytesGathered"),
+    public static final Key QUERY_FAIL_DEADLINE_MILLIS = new LongKey(
+        "queryFailTime",
+        Visibility.NONE);
+
     /**
      * This variable indicates when a running query should be expired,
      * and is effective only when 'timeout' of queryContext has a positive value.
      * Continuously updated by {@link org.apache.druid.query.scan.ScanQueryEngine}
      * by reducing its value on the time of every scan iteration.
      */
-    TIMEOUT_AT("timeoutAt"),
+    public static final Key TIMEOUT_AT = new LongKey(
+        "timeoutAt",
+        Visibility.NONE);
+
     /**
-     * The number of scanned rows.
-     * For backward compatibility the context key name still equals to "count".
+     * The number of rows scanned by {@link org.apache.druid.query.scan.ScanQueryEngine}.
+     *
+     * Named "count" for backwards compatibility with older data servers that still send this, even though it's now
+     * marked with {@link Visibility#NONE}.
      */
-    NUM_SCANNED_ROWS(
+    public static final Key NUM_SCANNED_ROWS = new CounterKey(
         "count",
-            (oldValue, newValue) -> ((Number) oldValue).longValue() + ((Number) newValue).longValue()
-    ),
+        Visibility.NONE);
+
     /**
      * The total CPU time for threads related to Sequence processing of the query.
      * Resulting value on a Broker is a sum of downstream values from historicals / realtime nodes.
      * For additional information see {@link org.apache.druid.query.CPUTimeMetricQueryRunner}
      */
-    CPU_CONSUMED_NANOS(
+    public static final Key CPU_CONSUMED_NANOS = new CounterKey(
         "cpuConsumed",
-            (oldValue, newValue) -> ((Number) oldValue).longValue() + ((Number) newValue).longValue()
-    ),
+        Visibility.NONE);
+
     /**
      * Indicates if a {@link ResponseContext} was truncated during serialization.
      */
-    TRUNCATED(
+    public static final Key TRUNCATED = new BooleanKey(
         "truncated",
-            (oldValue, newValue) -> (boolean) oldValue || (boolean) newValue
-    );
+        Visibility.HEADER);
+
+
+    /**
+     * One and only global list of keys. This is a semi-constant: it is mutable
+     * at start-up time, but then is not thread-safe, and must remain unchanged
+     * for the duration of the server run.
+     */
+    public static final Keys INSTANCE = new Keys();

Review comment:
       Please add a private constructor, so that we don't mistakenly create an instance instead of using this.

##########
File path: processing/src/main/java/org/apache/druid/query/context/ResponseContext.java
##########
@@ -30,93 +33,334 @@
 import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.NonnullPair;
-import org.apache.druid.java.util.common.jackson.JacksonUtils;
 import org.apache.druid.query.SegmentDescriptor;
+import org.apache.druid.utils.CollectionUtils;
 import org.joda.time.Interval;
 
 import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.function.BiFunction;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 
 /**
  * The context for storing and passing data between chains of {@link org.apache.druid.query.QueryRunner}s.
  * The context is also transferred between Druid nodes with all the data it contains.
- */
+ * <p>
+ * The response context consists of a set of key/value pairs. Keys are those defined in
+ * the {@code Keys} registry. Keys are indexed by key instance, not by name. The
+ * key defines the type of the associated value, including logic to merge values and
+ * to deserialize JSON values for that key.
+ *
+ * <h4>Structure</h4>
+ * The context has evolved to perform multiple tasks. First, it holds two kinds
+ * of information:
+ * <ul>
+ * <li>Information to be returned in the query response header.
+ * (These are values tagged as {@code HEADER}.)</li>
+ * <li>Values passed within a single server. These are tagged with
+ * visibility {@code NONE}.)</li>
+ * </ul>
+ * Second, it performs multiple tasks:
+ * <ul>
+ * <li>Registers the keys to be used in the header. But, since it also holds
+ * internal information, the internal information also needs keys, though the
+ * corresponding values are never serialized.</li>
+ * <li>Gathers information for the query as a whole.</li>
+ * <li>Merges information back up the query tree: from multiple segments,
+ * from multiple servers, etc.</li>
+ * <li>Manages headers size by dropping fields when the header would get too
+ * large.</li>
+ * </ul>
+ *
+ * A result is that the information the context, when inspected by a calling
+ * query, may be incomplete if some of it was previously dropped by the
+ * called query.
+ *
+ * <h4>API</h4>
+ *
+ * The query profile needs to obtain the full, untruncated information. To do this
+ * it piggy-backs on the set operations to obtain the full value. To ensure this
+ * is possible, code that works with standard values should call the set (or add)
+ * functions provided which will do the needed map update.
+  */
 @PublicApi
 public abstract class ResponseContext
 {
   /**
    * The base interface of a response context key.
    * Should be implemented by every context key.
    */
-  public interface BaseKey
+  public interface Key

Review comment:
       Hmm, I think this interface should be marked `ExtensionPoint`. `PublicApi` doesn't seem enough because [`PublicAPI` is not meant to be subclassed, but only means it's stable for callers](https://github.com/apache/druid/blob/master/core/src/main/java/org/apache/druid/guice/annotations/PublicApi.java#L32-L35). @paul-rogers can you add the `ExtensionPoint` annotation for this interface? Even though it's not marked, I agree that this change should be called out in the release notes. 

##########
File path: processing/src/main/java/org/apache/druid/query/context/ResponseContext.java
##########
@@ -351,87 +754,100 @@ public void merge(ResponseContext responseContext)
   /**
    * Serializes the context given that the resulting string length is less than the provided limit.
    * This method removes some elements from context collections if it's needed to satisfy the limit.
-   * There is no explicit priorities of keys which values are being truncated because for now there are only
-   * two potential limit breaking keys ({@link Key#UNCOVERED_INTERVALS}
-   * and {@link Key#MISSING_SEGMENTS}) and their values are arrays.
-   * Thus current implementation considers these arrays as equal prioritized and starts removing elements from
+   * There is no explicit priorities of keys which values are being truncated.
+   * Any kind of key can be removed, the key's @{code canDrop()} attribute indicates
+   * which can be dropped. (The unit tests use a string key.)
+   * Thus keys as equally prioritized and starts removing elements from
    * the array which serialized value length is the biggest.
-   * The resulting string might be correctly deserialized to {@link ResponseContext}.
+   * The resulting string will be correctly deserialized to {@link ResponseContext}.
    */
-  public SerializationResult serializeWith(ObjectMapper objectMapper, int maxCharsNumber) throws JsonProcessingException
+  public SerializationResult serializeWith(ObjectMapper objectMapper, int maxCharsNumber)
+      throws JsonProcessingException
   {
-    final String fullSerializedString = objectMapper.writeValueAsString(getDelegate());
+    final Map<Key, Object> headerMap =
+        getDelegate().entrySet()
+                     .stream()
+                     .filter(entry -> entry.getKey().getPhase() == Visibility.HEADER)
+                     .collect(
+                         Collectors.toMap(
+                             Map.Entry::getKey,
+                             Map.Entry::getValue
+                         )
+                     );
+
+    final String fullSerializedString = objectMapper.writeValueAsString(headerMap);
     if (fullSerializedString.length() <= maxCharsNumber) {
       return new SerializationResult(null, fullSerializedString);
-    } else {
-      // Indicates that the context is truncated during serialization.
-      add(Key.TRUNCATED, true);
-      final ObjectNode contextJsonNode = objectMapper.valueToTree(getDelegate());
-      final ArrayList<Map.Entry<String, JsonNode>> sortedNodesByLength = Lists.newArrayList(contextJsonNode.fields());
-      sortedNodesByLength.sort(VALUE_LENGTH_REVERSED_COMPARATOR);
-      int needToRemoveCharsNumber = fullSerializedString.length() - maxCharsNumber;
-      // The complexity of this block is O(n*m*log(m)) where n - context size, m - context's array size
-      for (Map.Entry<String, JsonNode> e : sortedNodesByLength) {
-        final String fieldName = e.getKey();
-        final JsonNode node = e.getValue();
-        if (node.isArray()) {
-          if (needToRemoveCharsNumber >= node.toString().length()) {
-            // We need to remove more chars than the field's length so removing it completely
-            contextJsonNode.remove(fieldName);
-            // Since the field is completely removed (name + value) we need to do a recalculation
-            needToRemoveCharsNumber = contextJsonNode.toString().length() - maxCharsNumber;
-          } else {
-            final ArrayNode arrayNode = (ArrayNode) node;
-            needToRemoveCharsNumber -= removeNodeElementsToSatisfyCharsLimit(arrayNode, needToRemoveCharsNumber);
-            if (arrayNode.size() == 0) {
-              // The field is empty, removing it because an empty array field may be misleading
-              // for the recipients of the truncated response context.
-              contextJsonNode.remove(fieldName);
-              // Since the field is completely removed (name + value) we need to do a recalculation
-              needToRemoveCharsNumber = contextJsonNode.toString().length() - maxCharsNumber;
-            }
-          } // node is not an array
-        } else {
-          // A context should not contain nulls so we completely remove the field.
+    }
+
+    int needToRemoveCharsNumber = fullSerializedString.length() - maxCharsNumber;
+    // Indicates that the context is truncated during serialization.
+    headerMap.put(Keys.TRUNCATED, true);
+    // Account for the extra field just added
+    needToRemoveCharsNumber += Keys.TRUNCATED.getName().length() + 7;
+    final ObjectNode contextJsonNode = objectMapper.valueToTree(headerMap);
+    final List<Map.Entry<String, JsonNode>> sortedNodesByLength = Lists.newArrayList(contextJsonNode.fields());
+    sortedNodesByLength.sort(VALUE_LENGTH_REVERSED_COMPARATOR);
+    // The complexity of this block is O(n*m*log(m)) where n - context size, m - context's array size
+    for (Map.Entry<String, JsonNode> e : sortedNodesByLength) {
+      final String fieldName = e.getKey();
+      if (!Keys.instance().keyOf(fieldName).canDrop()) {
+        continue;
+      }
+      final JsonNode node = e.getValue();
+      int removeLength = fieldName.toString().length() + node.toString().length();
+      if (removeLength < needToRemoveCharsNumber || !(node instanceof ArrayNode)) {

Review comment:
       ```suggestion
         if (removeLength < needToRemoveCharsNumber || !node.isArray()) {
   ```

##########
File path: processing/src/main/java/org/apache/druid/query/context/ResponseContext.java
##########
@@ -30,93 +33,334 @@
 import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.NonnullPair;
-import org.apache.druid.java.util.common.jackson.JacksonUtils;
 import org.apache.druid.query.SegmentDescriptor;
+import org.apache.druid.utils.CollectionUtils;
 import org.joda.time.Interval;
 
 import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.function.BiFunction;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 
 /**
  * The context for storing and passing data between chains of {@link org.apache.druid.query.QueryRunner}s.
  * The context is also transferred between Druid nodes with all the data it contains.
- */
+ * <p>
+ * The response context consists of a set of key/value pairs. Keys are those defined in
+ * the {@code Keys} registry. Keys are indexed by key instance, not by name. The
+ * key defines the type of the associated value, including logic to merge values and
+ * to deserialize JSON values for that key.
+ *
+ * <h4>Structure</h4>
+ * The context has evolved to perform multiple tasks. First, it holds two kinds
+ * of information:
+ * <ul>
+ * <li>Information to be returned in the query response header.
+ * (These are values tagged as {@code HEADER}.)</li>
+ * <li>Values passed within a single server. These are tagged with
+ * visibility {@code NONE}.)</li>
+ * </ul>
+ * Second, it performs multiple tasks:
+ * <ul>
+ * <li>Registers the keys to be used in the header. But, since it also holds
+ * internal information, the internal information also needs keys, though the
+ * corresponding values are never serialized.</li>
+ * <li>Gathers information for the query as a whole.</li>
+ * <li>Merges information back up the query tree: from multiple segments,
+ * from multiple servers, etc.</li>
+ * <li>Manages headers size by dropping fields when the header would get too
+ * large.</li>
+ * </ul>
+ *
+ * A result is that the information the context, when inspected by a calling
+ * query, may be incomplete if some of it was previously dropped by the
+ * called query.
+ *
+ * <h4>API</h4>
+ *
+ * The query profile needs to obtain the full, untruncated information. To do this
+ * it piggy-backs on the set operations to obtain the full value. To ensure this
+ * is possible, code that works with standard values should call the set (or add)
+ * functions provided which will do the needed map update.
+  */
 @PublicApi
 public abstract class ResponseContext
 {
   /**
    * The base interface of a response context key.
    * Should be implemented by every context key.
    */
-  public interface BaseKey
+  public interface Key
   {
+    /**
+     * Hack, pure and simple. The symbol "ResponseContext.Key.ETAG" must exist
+     * to avoid changing a line of code where we have no tests, which causes
+     * the build to fail. Remove this once the proper tests are added.
+     *
+     * @see {@link org.apache.druid.server.QueryResource#doPost}
+     */
+    public static final Key ETAG = Keys.ETAG;
+
     @JsonValue
     String getName();
+
+    /**
+     * The phase (header, trailer, none) where this key is emitted.
+     */
+    Visibility getPhase();
+
     /**
-     * Merge function associated with a key: Object (Object oldValue, Object newValue)
+     * Reads a value of this key from a JSON stream. Used by {@link ResponseContextDeserializer}.
+     */
+    Object readValue(JsonParser jp);
+
+    /**
+     * Merges two values of type T.
+     *
+     * This method may modify "oldValue" but must not modify "newValue".
      */
-    BiFunction<Object, Object, Object> getMergeFunction();
+    Object mergeValues(Object oldValue, Object newValue);
+
+    /**
+     * Returns true if this key can be removed to reduce header size when the
+     * header would otherwise be too large.
+     */
+    @JsonIgnore
+    boolean canDrop();
+  }
+
+  /**
+   * Where the key is emitted, if at all. Values in the context can be for internal
+   * use only: for return before the query results (header) or only after query
+   * results (trailer).
+   */
+  public enum Visibility
+  {
+    /**
+     * Keys that are present in both the "X-Druid-Response-Context" header *and* the response context trailer.
+     */
+    HEADER,

Review comment:
       What sort of keys will be in both the header and the trailer? Also, should it be something like `HEADER_AND_TRAILER`?

##########
File path: processing/src/main/java/org/apache/druid/query/context/ResponseContext.java
##########
@@ -351,87 +754,100 @@ public void merge(ResponseContext responseContext)
   /**
    * Serializes the context given that the resulting string length is less than the provided limit.
    * This method removes some elements from context collections if it's needed to satisfy the limit.
-   * There is no explicit priorities of keys which values are being truncated because for now there are only
-   * two potential limit breaking keys ({@link Key#UNCOVERED_INTERVALS}
-   * and {@link Key#MISSING_SEGMENTS}) and their values are arrays.
-   * Thus current implementation considers these arrays as equal prioritized and starts removing elements from
+   * There is no explicit priorities of keys which values are being truncated.
+   * Any kind of key can be removed, the key's @{code canDrop()} attribute indicates
+   * which can be dropped. (The unit tests use a string key.)
+   * Thus keys as equally prioritized and starts removing elements from
    * the array which serialized value length is the biggest.
-   * The resulting string might be correctly deserialized to {@link ResponseContext}.
+   * The resulting string will be correctly deserialized to {@link ResponseContext}.
    */
-  public SerializationResult serializeWith(ObjectMapper objectMapper, int maxCharsNumber) throws JsonProcessingException
+  public SerializationResult serializeWith(ObjectMapper objectMapper, int maxCharsNumber)
+      throws JsonProcessingException
   {
-    final String fullSerializedString = objectMapper.writeValueAsString(getDelegate());
+    final Map<Key, Object> headerMap =
+        getDelegate().entrySet()
+                     .stream()
+                     .filter(entry -> entry.getKey().getPhase() == Visibility.HEADER)
+                     .collect(
+                         Collectors.toMap(
+                             Map.Entry::getKey,
+                             Map.Entry::getValue
+                         )
+                     );
+
+    final String fullSerializedString = objectMapper.writeValueAsString(headerMap);
     if (fullSerializedString.length() <= maxCharsNumber) {
       return new SerializationResult(null, fullSerializedString);
-    } else {
-      // Indicates that the context is truncated during serialization.
-      add(Key.TRUNCATED, true);
-      final ObjectNode contextJsonNode = objectMapper.valueToTree(getDelegate());
-      final ArrayList<Map.Entry<String, JsonNode>> sortedNodesByLength = Lists.newArrayList(contextJsonNode.fields());
-      sortedNodesByLength.sort(VALUE_LENGTH_REVERSED_COMPARATOR);
-      int needToRemoveCharsNumber = fullSerializedString.length() - maxCharsNumber;
-      // The complexity of this block is O(n*m*log(m)) where n - context size, m - context's array size
-      for (Map.Entry<String, JsonNode> e : sortedNodesByLength) {
-        final String fieldName = e.getKey();
-        final JsonNode node = e.getValue();
-        if (node.isArray()) {
-          if (needToRemoveCharsNumber >= node.toString().length()) {
-            // We need to remove more chars than the field's length so removing it completely
-            contextJsonNode.remove(fieldName);
-            // Since the field is completely removed (name + value) we need to do a recalculation
-            needToRemoveCharsNumber = contextJsonNode.toString().length() - maxCharsNumber;
-          } else {
-            final ArrayNode arrayNode = (ArrayNode) node;
-            needToRemoveCharsNumber -= removeNodeElementsToSatisfyCharsLimit(arrayNode, needToRemoveCharsNumber);
-            if (arrayNode.size() == 0) {
-              // The field is empty, removing it because an empty array field may be misleading
-              // for the recipients of the truncated response context.
-              contextJsonNode.remove(fieldName);
-              // Since the field is completely removed (name + value) we need to do a recalculation
-              needToRemoveCharsNumber = contextJsonNode.toString().length() - maxCharsNumber;
-            }
-          } // node is not an array
-        } else {
-          // A context should not contain nulls so we completely remove the field.
+    }
+
+    int needToRemoveCharsNumber = fullSerializedString.length() - maxCharsNumber;
+    // Indicates that the context is truncated during serialization.
+    headerMap.put(Keys.TRUNCATED, true);
+    // Account for the extra field just added
+    needToRemoveCharsNumber += Keys.TRUNCATED.getName().length() + 7;

Review comment:
       Where is `7` from?

##########
File path: processing/src/main/java/org/apache/druid/query/context/ResponseContext.java
##########
@@ -129,214 +373,373 @@
      *
      * @see org.apache.druid.query.Query#getMostSpecificId
      */
-    REMAINING_RESPONSES_FROM_QUERY_SERVERS(
+    public static final Key REMAINING_RESPONSES_FROM_QUERY_SERVERS = new AbstractKey(
         "remainingResponsesFromQueryServers",
-            (totalRemainingPerId, idAndNumResponses) -> {
-              final ConcurrentHashMap<String, Integer> map = (ConcurrentHashMap<String, Integer>) totalRemainingPerId;
-              final NonnullPair<String, Integer> pair = (NonnullPair<String, Integer>) idAndNumResponses;
-              map.compute(
-                  pair.lhs,
-                  (id, remaining) -> remaining == null ? pair.rhs : remaining + pair.rhs
-              );
-              return map;
-            }
-    ),
+        Visibility.NONE, true,
+        Object.class)
+    {
+      @Override
+      @SuppressWarnings("unchecked")
+      public Object mergeValues(Object totalRemainingPerId, Object idAndNumResponses)
+      {
+        final ConcurrentHashMap<String, Integer> map = (ConcurrentHashMap<String, Integer>) totalRemainingPerId;
+        final NonnullPair<String, Integer> pair = (NonnullPair<String, Integer>) idAndNumResponses;
+        map.compute(
+            pair.lhs,
+            (id, remaining) -> remaining == null ? pair.rhs : remaining + pair.rhs);
+        return map;
+      }
+    };
+
     /**
      * Lists missing segments.
      */
-    MISSING_SEGMENTS(
+    public static final Key MISSING_SEGMENTS = new AbstractKey(
         "missingSegments",
-            (oldValue, newValue) -> {
-              final ArrayList<SegmentDescriptor> result = new ArrayList<SegmentDescriptor>((List) oldValue);
-              result.addAll((List) newValue);
-              return result;
-            }
-    ),
+        Visibility.HEADER, true,
+        new TypeReference<List<SegmentDescriptor>>() {})
+    {
+      @Override
+      @SuppressWarnings("unchecked")
+      public Object mergeValues(Object oldValue, Object newValue)
+      {
+        final List<SegmentDescriptor> result = new ArrayList<SegmentDescriptor>((List<SegmentDescriptor>) oldValue);
+        result.addAll((List<SegmentDescriptor>) newValue);
+        return result;
+      }
+    };
+
     /**
      * Entity tag. A part of HTTP cache validation mechanism.
      * Is being removed from the context before sending and used as a separate HTTP header.
      */
-    ETAG("ETag"),
+    public static final Key ETAG = new StringKey("ETag", Visibility.NONE, true);
+
     /**
-     * Query fail time (current time + timeout).
-     * It is not updated continuously as {@link Key#TIMEOUT_AT}.
+     * Query total bytes gathered.
      */
-    QUERY_FAIL_DEADLINE_MILLIS("queryFailTime"),
+    public static final Key QUERY_TOTAL_BYTES_GATHERED = new AbstractKey(
+        "queryTotalBytesGathered",
+        Visibility.NONE, false,
+        new TypeReference<AtomicLong>() {})
+    {
+      @Override
+      public Object mergeValues(Object oldValue, Object newValue)
+      {
+        return ((AtomicLong) newValue).addAndGet(((AtomicLong) newValue).get());
+      }
+    };
+
     /**
-     * Query total bytes gathered.
+     * Query fail time (current time + timeout).
+     * It is not updated continuously as {@link Keys#TIMEOUT_AT}.
      */
-    QUERY_TOTAL_BYTES_GATHERED("queryTotalBytesGathered"),
+    public static final Key QUERY_FAIL_DEADLINE_MILLIS = new LongKey(
+        "queryFailTime",
+        Visibility.NONE);
+
     /**
      * This variable indicates when a running query should be expired,
      * and is effective only when 'timeout' of queryContext has a positive value.
      * Continuously updated by {@link org.apache.druid.query.scan.ScanQueryEngine}
      * by reducing its value on the time of every scan iteration.
      */
-    TIMEOUT_AT("timeoutAt"),
+    public static final Key TIMEOUT_AT = new LongKey(
+        "timeoutAt",
+        Visibility.NONE);
+
     /**
-     * The number of scanned rows.
-     * For backward compatibility the context key name still equals to "count".
+     * The number of rows scanned by {@link org.apache.druid.query.scan.ScanQueryEngine}.
+     *
+     * Named "count" for backwards compatibility with older data servers that still send this, even though it's now
+     * marked with {@link Visibility#NONE}.
      */
-    NUM_SCANNED_ROWS(
+    public static final Key NUM_SCANNED_ROWS = new CounterKey(
         "count",
-            (oldValue, newValue) -> ((Number) oldValue).longValue() + ((Number) newValue).longValue()
-    ),
+        Visibility.NONE);
+
     /**
      * The total CPU time for threads related to Sequence processing of the query.
      * Resulting value on a Broker is a sum of downstream values from historicals / realtime nodes.
      * For additional information see {@link org.apache.druid.query.CPUTimeMetricQueryRunner}
      */
-    CPU_CONSUMED_NANOS(
+    public static final Key CPU_CONSUMED_NANOS = new CounterKey(
         "cpuConsumed",
-            (oldValue, newValue) -> ((Number) oldValue).longValue() + ((Number) newValue).longValue()
-    ),
+        Visibility.NONE);
+
     /**
      * Indicates if a {@link ResponseContext} was truncated during serialization.
      */
-    TRUNCATED(
+    public static final Key TRUNCATED = new BooleanKey(
         "truncated",
-            (oldValue, newValue) -> (boolean) oldValue || (boolean) newValue
-    );
+        Visibility.HEADER);
+
+
+    /**
+     * One and only global list of keys. This is a semi-constant: it is mutable
+     * at start-up time, but then is not thread-safe, and must remain unchanged
+     * for the duration of the server run.
+     */
+    public static final Keys INSTANCE = new Keys();
 
     /**
      * ConcurrentSkipListMap is used to have the natural ordering of its keys.
-     * Thread-safe structure is required since there is no guarantee that {@link #registerKey(BaseKey)}
+     * Thread-safe structure is required since there is no guarantee that {@link #registerKey(Key)}
      * would be called only from class static blocks.
      */
-    private static final ConcurrentMap<String, BaseKey> REGISTERED_KEYS = new ConcurrentSkipListMap<>();
+    private ConcurrentMap<String, Key> registered_keys = new ConcurrentSkipListMap<>();

Review comment:
       ```suggestion
       private final ConcurrentMap<String, Key> registeredKeys = new ConcurrentSkipListMap<>();
   ```

##########
File path: processing/src/main/java/org/apache/druid/query/context/ResponseContext.java
##########
@@ -30,93 +33,334 @@
 import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.NonnullPair;
-import org.apache.druid.java.util.common.jackson.JacksonUtils;
 import org.apache.druid.query.SegmentDescriptor;
+import org.apache.druid.utils.CollectionUtils;
 import org.joda.time.Interval;
 
 import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.function.BiFunction;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 
 /**
  * The context for storing and passing data between chains of {@link org.apache.druid.query.QueryRunner}s.
  * The context is also transferred between Druid nodes with all the data it contains.
- */
+ * <p>
+ * The response context consists of a set of key/value pairs. Keys are those defined in
+ * the {@code Keys} registry. Keys are indexed by key instance, not by name. The
+ * key defines the type of the associated value, including logic to merge values and
+ * to deserialize JSON values for that key.
+ *
+ * <h4>Structure</h4>
+ * The context has evolved to perform multiple tasks. First, it holds two kinds
+ * of information:
+ * <ul>
+ * <li>Information to be returned in the query response header.
+ * (These are values tagged as {@code HEADER}.)</li>
+ * <li>Values passed within a single server. These are tagged with
+ * visibility {@code NONE}.)</li>
+ * </ul>
+ * Second, it performs multiple tasks:
+ * <ul>
+ * <li>Registers the keys to be used in the header. But, since it also holds
+ * internal information, the internal information also needs keys, though the
+ * corresponding values are never serialized.</li>
+ * <li>Gathers information for the query as a whole.</li>
+ * <li>Merges information back up the query tree: from multiple segments,
+ * from multiple servers, etc.</li>
+ * <li>Manages headers size by dropping fields when the header would get too
+ * large.</li>
+ * </ul>
+ *
+ * A result is that the information the context, when inspected by a calling
+ * query, may be incomplete if some of it was previously dropped by the
+ * called query.
+ *
+ * <h4>API</h4>
+ *
+ * The query profile needs to obtain the full, untruncated information. To do this
+ * it piggy-backs on the set operations to obtain the full value. To ensure this
+ * is possible, code that works with standard values should call the set (or add)
+ * functions provided which will do the needed map update.
+  */
 @PublicApi
 public abstract class ResponseContext
 {
   /**
    * The base interface of a response context key.
    * Should be implemented by every context key.
    */
-  public interface BaseKey
+  public interface Key
   {
+    /**
+     * Hack, pure and simple. The symbol "ResponseContext.Key.ETAG" must exist
+     * to avoid changing a line of code where we have no tests, which causes
+     * the build to fail. Remove this once the proper tests are added.
+     *
+     * @see {@link org.apache.druid.server.QueryResource#doPost}
+     */
+    public static final Key ETAG = Keys.ETAG;

Review comment:
       I would suggest to ignore the bot and delete this.

##########
File path: processing/src/main/java/org/apache/druid/query/context/ResponseContext.java
##########
@@ -351,87 +754,100 @@ public void merge(ResponseContext responseContext)
   /**
    * Serializes the context given that the resulting string length is less than the provided limit.
    * This method removes some elements from context collections if it's needed to satisfy the limit.
-   * There is no explicit priorities of keys which values are being truncated because for now there are only
-   * two potential limit breaking keys ({@link Key#UNCOVERED_INTERVALS}
-   * and {@link Key#MISSING_SEGMENTS}) and their values are arrays.
-   * Thus current implementation considers these arrays as equal prioritized and starts removing elements from
+   * There is no explicit priorities of keys which values are being truncated.
+   * Any kind of key can be removed, the key's @{code canDrop()} attribute indicates
+   * which can be dropped. (The unit tests use a string key.)
+   * Thus keys as equally prioritized and starts removing elements from
    * the array which serialized value length is the biggest.
-   * The resulting string might be correctly deserialized to {@link ResponseContext}.
+   * The resulting string will be correctly deserialized to {@link ResponseContext}.
    */
-  public SerializationResult serializeWith(ObjectMapper objectMapper, int maxCharsNumber) throws JsonProcessingException
+  public SerializationResult serializeWith(ObjectMapper objectMapper, int maxCharsNumber)
+      throws JsonProcessingException
   {
-    final String fullSerializedString = objectMapper.writeValueAsString(getDelegate());
+    final Map<Key, Object> headerMap =
+        getDelegate().entrySet()
+                     .stream()
+                     .filter(entry -> entry.getKey().getPhase() == Visibility.HEADER)
+                     .collect(
+                         Collectors.toMap(
+                             Map.Entry::getKey,
+                             Map.Entry::getValue
+                         )
+                     );
+
+    final String fullSerializedString = objectMapper.writeValueAsString(headerMap);
     if (fullSerializedString.length() <= maxCharsNumber) {
       return new SerializationResult(null, fullSerializedString);
-    } else {
-      // Indicates that the context is truncated during serialization.
-      add(Key.TRUNCATED, true);
-      final ObjectNode contextJsonNode = objectMapper.valueToTree(getDelegate());
-      final ArrayList<Map.Entry<String, JsonNode>> sortedNodesByLength = Lists.newArrayList(contextJsonNode.fields());
-      sortedNodesByLength.sort(VALUE_LENGTH_REVERSED_COMPARATOR);
-      int needToRemoveCharsNumber = fullSerializedString.length() - maxCharsNumber;
-      // The complexity of this block is O(n*m*log(m)) where n - context size, m - context's array size
-      for (Map.Entry<String, JsonNode> e : sortedNodesByLength) {
-        final String fieldName = e.getKey();
-        final JsonNode node = e.getValue();
-        if (node.isArray()) {
-          if (needToRemoveCharsNumber >= node.toString().length()) {
-            // We need to remove more chars than the field's length so removing it completely
-            contextJsonNode.remove(fieldName);
-            // Since the field is completely removed (name + value) we need to do a recalculation
-            needToRemoveCharsNumber = contextJsonNode.toString().length() - maxCharsNumber;
-          } else {
-            final ArrayNode arrayNode = (ArrayNode) node;
-            needToRemoveCharsNumber -= removeNodeElementsToSatisfyCharsLimit(arrayNode, needToRemoveCharsNumber);
-            if (arrayNode.size() == 0) {
-              // The field is empty, removing it because an empty array field may be misleading
-              // for the recipients of the truncated response context.
-              contextJsonNode.remove(fieldName);
-              // Since the field is completely removed (name + value) we need to do a recalculation
-              needToRemoveCharsNumber = contextJsonNode.toString().length() - maxCharsNumber;
-            }
-          } // node is not an array
-        } else {
-          // A context should not contain nulls so we completely remove the field.
+    }
+
+    int needToRemoveCharsNumber = fullSerializedString.length() - maxCharsNumber;
+    // Indicates that the context is truncated during serialization.
+    headerMap.put(Keys.TRUNCATED, true);
+    // Account for the extra field just added
+    needToRemoveCharsNumber += Keys.TRUNCATED.getName().length() + 7;
+    final ObjectNode contextJsonNode = objectMapper.valueToTree(headerMap);
+    final List<Map.Entry<String, JsonNode>> sortedNodesByLength = Lists.newArrayList(contextJsonNode.fields());
+    sortedNodesByLength.sort(VALUE_LENGTH_REVERSED_COMPARATOR);
+    // The complexity of this block is O(n*m*log(m)) where n - context size, m - context's array size
+    for (Map.Entry<String, JsonNode> e : sortedNodesByLength) {
+      final String fieldName = e.getKey();
+      if (!Keys.instance().keyOf(fieldName).canDrop()) {
+        continue;
+      }
+      final JsonNode node = e.getValue();
+      int removeLength = fieldName.toString().length() + node.toString().length();

Review comment:
       ```suggestion
         int removeLength = fieldName.length() + node.toString().length();
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org