You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2022/01/04 11:36:02 UTC

[GitHub] [druid] cryptoe commented on a change in pull request #12078: Grouping on arrays as arrays

cryptoe commented on a change in pull request #12078:
URL: https://github.com/apache/druid/pull/12078#discussion_r778013977



##########
File path: processing/src/main/java/org/apache/druid/query/groupby/ResultRowDeserializer.java
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.groupby;
+
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import org.apache.druid.data.input.Row;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.segment.data.ComparableStringArray;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+
+public class ResultRowDeserializer extends JsonDeserializer<ResultRow>
+{
+  final List<ColumnType> types;
+  final GroupByQuery query;
+
+  public ResultRowDeserializer(final List<ColumnType> types, final GroupByQuery query)
+  {
+    this.types = types;
+    this.query = query;
+  }
+
+  public static ResultRowDeserializer fromQuery(
+      final GroupByQuery query
+  )
+  {
+    RowSignature rowSignature = query.getResultRowSignature();
+    final List<ColumnType> types = new ArrayList<>(rowSignature.size());
+
+    for (String name : rowSignature.getColumnNames()) {
+      final ColumnType type = rowSignature.getColumnType(name)
+                                          .orElseThrow(() -> new ISE("No type for column [%s]", name));
+
+      types.add(type);
+    }
+
+    return new ResultRowDeserializer(types, query);
+
+  }
+
+  @Override
+  public ResultRow deserialize(JsonParser jp, DeserializationContext ctxt) throws IOException
+  {
+    // Deserializer that can deserialize either array- or map-based rows.
+    if (jp.isExpectedStartObjectToken()) {
+      final Row row = jp.readValueAs(Row.class);
+      return ResultRow.fromLegacyRow(row, query);
+    } else if (jp.isExpectedStartArrayToken()) {
+      final Object[] retVal = new Object[types.size()];
+
+      for (int i = 0; i < types.size(); i++) {
+        final JsonToken token = jp.nextToken();
+        switch (types.get(i).getType()) {
+          case STRING:
+            if (token == JsonToken.VALUE_NULL) {
+              retVal[i] = null;
+            } else if (token == JsonToken.VALUE_STRING) {
+              retVal[i] = jp.getText();
+            } else {
+              throw ctxt.instantiationException(
+                  ResultRow.class,
+                  StringUtils.format("Unexpected token [%s] when reading string", token)
+              );
+            }
+            break;
+
+          case LONG:
+            retVal[i] = token == JsonToken.VALUE_NULL ? null : jp.getLongValue();
+            break;
+          case DOUBLE:
+            retVal[i] = token == JsonToken.VALUE_NULL ? null : jp.getDoubleValue();
+            break;
+          case FLOAT:
+            retVal[i] = token == JsonToken.VALUE_NULL ? null : jp.getFloatValue();
+            break;
+          case ARRAY:
+            if (types.get(i).equals(ColumnType.STRING_ARRAY)) {
+              final List<String> strings = new ArrayList<>();
+              while (jp.nextToken() != JsonToken.END_ARRAY) {
+                strings.add(jp.getText());
+              }
+              retVal[i] = ComparableStringArray.of(strings.toArray(new String[0]));
+              break;
+            }
+
+          default:
+            throw new ISE("Can't handle type [%s]", types.get(i).asTypeString());

Review comment:
       Fair enough. I can remove this class altogether. Added it initially to have stricter types while deserializing. But if we have unknown types then falling back on Jackson seems much better.  

##########
File path: processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/ArrayGroupByColumnSelectorStrategy.java
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.groupby.epinephelinae.column;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.BiMap;
+import com.google.common.collect.HashBiMap;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.query.groupby.ResultRow;
+import org.apache.druid.query.groupby.epinephelinae.Grouper;
+import org.apache.druid.query.ordering.StringComparator;
+import org.apache.druid.query.ordering.StringComparators;
+import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.DimensionSelector;
+import org.apache.druid.segment.data.ComparableIntArray;
+import org.apache.druid.segment.data.ComparableStringArray;
+import org.apache.druid.segment.data.IndexedInts;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+
+public class ArrayGroupByColumnSelectorStrategy
+    implements GroupByColumnSelectorStrategy
+{
+  private static final int GROUP_BY_MISSING_VALUE = -1;
+
+
+  // contains string <-> id for each element of the multi value grouping column
+  // for eg : [a,b,c] is the col value. dictionaryToInt will contain { a <-> 1, b <-> 2, c <-> 3}
+  private final BiMap<String, Integer> dictionaryToInt;
+
+  // stores each row as a integer array where the int represents the value in dictionaryToInt
+  // for eg : [a,b,c] would be converted to [1,2,3] and assigned a integer value 1.
+  // [1,2,3] <-> 1
+  private final BiMap<ComparableIntArray, Integer> intListToInt;
+
+  @Override
+  public int getGroupingKeySize()
+  {
+    return Integer.BYTES;
+  }
+
+  public ArrayGroupByColumnSelectorStrategy()
+  {
+    dictionaryToInt = HashBiMap.create();
+    intListToInt = HashBiMap.create();
+  }
+
+  @VisibleForTesting
+  ArrayGroupByColumnSelectorStrategy(
+      BiMap<String, Integer> dictionaryToInt,
+      BiMap<ComparableIntArray, Integer> intArrayToInt
+  )
+  {
+    this.dictionaryToInt = dictionaryToInt;
+    this.intListToInt = intArrayToInt;
+  }
+
+  @Override
+  public void processValueFromGroupingKey(
+      GroupByColumnSelectorPlus selectorPlus,
+      ByteBuffer key,
+      ResultRow resultRow,
+      int keyBufferPosition
+  )
+  {
+    final int id = key.getInt(keyBufferPosition);
+
+    // GROUP_BY_MISSING_VALUE is used to indicate empty rows
+    if (id != GROUP_BY_MISSING_VALUE) {
+      final int[] intRepresentation = intListToInt.inverse()
+                                                  .get(id).getDelegate();
+      final String[] stringRepresentaion = new String[intRepresentation.length];
+      for (int i = 0; i < intRepresentation.length; i++) {
+        stringRepresentaion[i] = dictionaryToInt.inverse().get(intRepresentation[i]);
+      }
+      resultRow.set(selectorPlus.getResultRowPosition(), ComparableStringArray.of(stringRepresentaion));
+    } else {
+      resultRow.set(selectorPlus.getResultRowPosition(), ComparableStringArray.of(NullHandling.defaultStringValues()));

Review comment:
       The thought initially was to make it similar to StringGroupByColumnSelectorStrategy. But thinking about it now arrays need not have a default value and can just return null. 
   Acked.

##########
File path: sql/src/main/java/org/apache/druid/sql/calcite/expression/builtin/ArrayConstructorOperatorConversion.java
##########
@@ -45,6 +53,22 @@ public DruidExpression toDruidExpression(
       final RexNode rexNode
   )
   {
+    // Check if array needs to be unnested
+    if (plannerContext.getQueryContext()
+                      .getOrDefault(
+                          QueryContexts.ENABLE_UNNESTED_ARRAYS_KEY,
+                          QueryContexts.DEFAULT_ENABLE_UNNESTED_ARRAYS
+                      ).equals(Boolean.FALSE)) {
+      List<RexNode> nodes = ((RexCall) rexNode).getOperands();
+      Preconditions.checkArgument(
+          nodes == null || nodes.size() != 1,
+          "ARRAY[] should have exactly one argument"
+      );
+      if (nodes.get(0).getKind() == SqlKind.LITERAL) {
+        throw new UOE("ARRAY[] support for literals not implemented");
+      }
+      return Expressions.toDruidExpression(plannerContext, rowSignature, nodes.get(0));
+    }

Review comment:
       Makes sense. I have removed the array hook from here and pushed the logic to ArrayConstructorFunction. 

##########
File path: processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java
##########
@@ -386,13 +393,16 @@ public static boolean canPushDownLimit(ColumnSelectorFactory columnSelectorFacto
     @Override
     public GroupByColumnSelectorStrategy makeColumnSelectorStrategy(
         ColumnCapabilities capabilities,
-        ColumnValueSelector selector
+        ColumnValueSelector selector,
+        DimensionSpec dimensionSpec
     )
     {
       switch (capabilities.getType()) {
         case STRING:

Review comment:
       I will be breaking up the PR into two phases.
   1. Add the string implementation first but also have the generic building blocks in place.
   2. Add the rest of the world implementation. 
   At this point, I am not sure. If one PR or 2 PR's would be necessary. 

##########
File path: processing/src/main/java/org/apache/druid/segment/DimensionHandlerUtils.java
##########
@@ -376,11 +373,37 @@ public static Float convertObjectToFloat(@Nullable Object valObj, boolean report
         return convertObjectToDouble(obj, reportParseExceptions);
       case STRING:
         return convertObjectToString(obj);
+      case ARRAY:
+        return convertToArray(obj);

Review comment:
       acked. Working on it

##########
File path: processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java
##########
@@ -1357,6 +1373,12 @@ private RowBasedKeySerdeHelper makeSerdeHelper(
     )
     {
       switch (valueType.getType()) {
+        case ARRAY:
+          return new ArrayRowBasedKeySerdeHelper(

Review comment:
       Acked. Working on it.

##########
File path: processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java
##########
@@ -573,7 +572,7 @@ public void testResultSerde() throws Exception
         .setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD)
         .setDimensions(Collections.singletonList(DefaultDimensionSpec.of("test")))
         .setAggregatorSpecs(Collections.singletonList(QueryRunnerTestHelper.ROWS_COUNT))
-        .setPostAggregatorSpecs(Collections.singletonList(new ConstantPostAggregator("post", 10)))
+        .setPostAggregatorSpecs(Collections.singletonList(new ConstantPostAggregator("post", 10.0)))

Review comment:
       Yup.

##########
File path: sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java
##########
@@ -403,7 +404,17 @@ private static Grouping computeGrouping(
       }
 
       final RelDataType dataType = rexNode.getType();
-      final ColumnType outputType = Calcites.getColumnTypeForRelDataType(dataType);
+      final ColumnType outputType;
+      if (plannerContext.getQueryContext()
+                        .getOrDefault(
+                            QueryContexts.ENABLE_UNNESTED_ARRAYS_KEY,
+                            QueryContexts.DEFAULT_ENABLE_UNNESTED_ARRAYS
+                        ).equals(Boolean.FALSE)) {
+        outputType = Calcites.getValueTypeForRelDataTypeFull(dataType);
+      } else {
+        outputType = Calcites.getColumnTypeForRelDataType(dataType);
+      }

Review comment:
       acked.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org