You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2021/12/01 02:02:58 UTC

[GitHub] [druid] gianm commented on a change in pull request #11828: Refactor ResponseContext

gianm commented on a change in pull request #11828:
URL: https://github.com/apache/druid/pull/11828#discussion_r759795681



##########
File path: processing/src/main/java/org/apache/druid/query/context/ResponseContext.java
##########
@@ -30,93 +33,334 @@
 import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.NonnullPair;
-import org.apache.druid.java.util.common.jackson.JacksonUtils;
 import org.apache.druid.query.SegmentDescriptor;
+import org.apache.druid.utils.CollectionUtils;
 import org.joda.time.Interval;
 
 import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.function.BiFunction;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 
 /**
  * The context for storing and passing data between chains of {@link org.apache.druid.query.QueryRunner}s.
  * The context is also transferred between Druid nodes with all the data it contains.
- */
+ * <p>
+ * The response context consists of a set of key/value pairs. Keys are those defined in
+ * the {@code Keys} registry. Keys are indexed by key instance, not by name. The
+ * key defines the type of the associated value, including logic to merge values and
+ * to deserialize JSON values for that key.
+ *
+ * <h4>Structure</h4>
+ * The context has evolved to perform multiple tasks. First, it holds two kinds
+ * of information:
+ * <ul>
+ * <li>Information to be returned in the query response header.
+ * (These are values tagged as {@code HEADER}.)</li>
+ * <li>Values passed within a single server. These are tagged with
+ * visibility {@code NONE}.)</li>
+ * </ul>
+ * Second, it performs multiple tasks:
+ * <ul>
+ * <li>Registers the keys to be used in the header. But, since it also holds
+ * internal information, the internal information also needs keys, though the
+ * corresponding values are never serialized.</li>
+ * <li>Gathers information for the query as a whole.</li>
+ * <li>Merges information back up the query tree: from multiple segments,
+ * from multiple servers, etc.</li>
+ * <li>Manages headers size by dropping fields when the header would get too
+ * large.</li>
+ * </ul>
+ *
+ * A result is that the information the context, when inspected by a calling
+ * query, may be incomplete if some of it was previously dropped by the
+ * called query.
+ *
+ * <h4>API</h4>
+ *
+ * The query profile needs to obtain the full, untruncated information. To do this
+ * it piggy-backs on the set operations to obtain the full value. To ensure this
+ * is possible, code that works with standard values should call the set (or add)
+ * functions provided which will do the needed map update.
+  */
 @PublicApi
 public abstract class ResponseContext
 {
   /**
    * The base interface of a response context key.
    * Should be implemented by every context key.
    */
-  public interface BaseKey
+  public interface Key

Review comment:
       What do you all think about tagging this with `@UnstableApi` too? I think it's a good idea, because:
   
   - This is not a common extension point
   - We may be changing it more in the future as part of additional query trailer work
   
   So, thinking about compatibility in each of those changes will be a hassle without much benefit.

##########
File path: processing/src/main/java/org/apache/druid/query/context/ResponseContextDeserializer.java
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.context;
+
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+
+import java.io.IOException;
+
+@SuppressWarnings("serial")
+public class ResponseContextDeserializer extends StdDeserializer<ResponseContext>
+{
+  public ResponseContextDeserializer()
+  {
+    super(ResponseContext.class);
+  }
+
+  @Override
+  public ResponseContext deserialize(
+      final JsonParser jp,
+      final DeserializationContext ctxt
+  ) throws IOException
+  {
+    if (jp.currentToken() != JsonToken.START_OBJECT) {
+      throw ctxt.wrongTokenException(jp, ResponseContext.class, JsonToken.START_OBJECT, null);
+    }
+
+    // TODO(gianm): Check if we need concurrent response context here

Review comment:
       The analysis here makes sense. I'd suggest adding a note to the ResponseContext javadocs to explain the situation to callers. Something like:
   
   > When deserialized using Jackson, a non-concurrent `{@link DefaultResponseContext}` will be generated. If you need a concurrent context, create a `{@link ConcurrentResponseContext}` and merge in the deserialized context.
   >
   > `@see ResponseContextDeserializer`
   
   Please also remove the TODO comment (we generally avoid checking those in; they're meant to be notes-to-self in one's own branches).

##########
File path: processing/src/main/java/org/apache/druid/query/context/ResponseContextDeserializer.java
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.context;
+
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+
+import java.io.IOException;
+
+/**
+ * Deserialize a response context. The response context is created for single-thread use.
+ * (That is, it is non-concurrent.) Clients of this code should convert the
+ * context to concurrent if it will be used across threads.
+ */
+@SuppressWarnings("serial")
+public class ResponseContextDeserializer extends StdDeserializer<ResponseContext>
+{
+  public ResponseContextDeserializer()
+  {
+    super(ResponseContext.class);
+  }
+
+  @Override
+  public ResponseContext deserialize(
+      final JsonParser jp,
+      final DeserializationContext ctxt
+  ) throws IOException
+  {
+    if (jp.currentToken() != JsonToken.START_OBJECT) {
+      throw ctxt.wrongTokenException(jp, ResponseContext.class, JsonToken.START_OBJECT, null);
+    }
+
+    // TODO(gianm): Check if we need concurrent response context here
+    final ResponseContext retVal = ResponseContext.createEmpty();
+
+    jp.nextToken();
+
+    ResponseContext.Keys keys = ResponseContext.Keys.instance();
+    while (jp.currentToken() == JsonToken.FIELD_NAME) {
+      final ResponseContext.Key key = keys.keyOf(jp.getText());

Review comment:
       I think we'll want some handling here for the case where a data server generates a key that the Broker hasn't heard of. It may happen when we add new keys. Skipping them seems reasonable.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org