You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by "rohangarg (via GitHub)" <gi...@apache.org> on 2023/05/09 09:07:03 UTC

[GitHub] [druid] rohangarg commented on a diff in pull request #14058: Vectorized T-Digest aggregator

rohangarg commented on code in PR #14058:
URL: https://github.com/apache/druid/pull/14058#discussion_r1188310351


##########
extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestVectorizedAggregator.java:
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.tdigestsketch;
+
+import org.apache.druid.query.aggregation.VectorAggregator;
+import org.apache.druid.segment.vector.VectorObjectSelector;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+
+public class TDigestVectorizedAggregator implements VectorAggregator
+{
+  private final TDigestSketchAggregatorHelper innerAggregator;
+  private final VectorObjectSelector selector;
+
+  public TDigestVectorizedAggregator(Integer compression, VectorObjectSelector selector)
+  {
+    this.selector = selector;
+    this.innerAggregator = new TDigestSketchAggregatorHelper(compression == null
+                                                             ? TDigestSketchAggregatorFactory.DEFAULT_COMPRESSION
+                                                             : compression);
+  }
+
+  @Override
+  public void init(ByteBuffer buf, int position)
+  {
+    // after this point the histogram is present in the cache
+    innerAggregator.init(buf, position);
+  }
+
+  @Override
+  public void aggregate(ByteBuffer buf, int position, int startRow, int endRow)
+  {
+    Object[] vector = selector.getObjectVector();
+    for (int i = startRow; i < endRow; i++) {
+      Object other = vector[i];
+      innerAggregator.aggregate(other, buf, position);
+    }
+  }
+
+  @Override
+  public void aggregate(ByteBuffer buf, int numRows, int[] positions, @Nullable int[] rows, int positionOffset)
+  {
+    Object[] vector = selector.getObjectVector();
+    for (int i = 0; i < numRows; i++) {
+      Object other = vector[i];

Review Comment:
   should we use `rows != null ? vector[rows[i]] : vector[i]` here?



##########
extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestNumericVectorizedAggregator.java:
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.tdigestsketch;
+
+import org.apache.druid.query.aggregation.VectorAggregator;
+import org.apache.druid.segment.vector.VectorValueSelector;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+
+public class TDigestNumericVectorizedAggregator implements VectorAggregator
+{
+  private final TDigestSketchAggregatorHelper innerAggregator;
+  private final VectorValueSelector selector;
+
+  public TDigestNumericVectorizedAggregator(Integer compression, VectorValueSelector selector)
+  {
+    innerAggregator = new TDigestSketchAggregatorHelper(compression == null
+                                                        ? TDigestSketchAggregatorFactory.DEFAULT_COMPRESSION
+                                                        : compression);
+    this.selector = selector;
+  }
+
+  @Override
+  public void init(ByteBuffer buf, int position)
+  {
+    // after this point the histogram is present in the cache
+    innerAggregator.init(buf, position);
+  }
+
+  @Override
+  public void aggregate(ByteBuffer buf, int position, int startRow, int endRow)
+  {
+    double[] vector = selector.getDoubleVector();
+    boolean[] isNull = selector.getNullVector();
+    for (int i = startRow; i < endRow; i++) {
+      double other = toObject(vector, isNull, i);
+      innerAggregator.aggregate(other, buf, position);
+    }
+  }
+
+  @Override
+  public void aggregate(ByteBuffer buf, int numRows, int[] positions, @Nullable int[] rows, int positionOffset)
+  {
+    double[] vector = selector.getDoubleVector();
+    boolean[] isNull = selector.getNullVector();
+    for (int i = 0; i < numRows; i++) {
+      int position = positions[i] + positionOffset;
+      int index = rows != null ? rows[i] : i;

Review Comment:
   should we use rows != null ? vector[rows[i]] : vector[i] here?



##########
extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestVectorizedAggregator.java:
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.tdigestsketch;
+
+import org.apache.druid.query.aggregation.VectorAggregator;
+import org.apache.druid.segment.vector.VectorObjectSelector;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+
+public class TDigestVectorizedAggregator implements VectorAggregator

Review Comment:
   can change to `TDigestMergeVectorAggregator ` for consistency with other vector agg names



##########
extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestVectorizedAggregator.java:
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.tdigestsketch;
+
+import org.apache.druid.query.aggregation.VectorAggregator;
+import org.apache.druid.segment.vector.VectorObjectSelector;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+
+public class TDigestVectorizedAggregator implements VectorAggregator
+{
+  private final TDigestSketchAggregatorHelper innerAggregator;

Review Comment:
   nit : could be named `aggregatorHelper`



##########
extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestNumericVectorizedAggregator.java:
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.tdigestsketch;
+
+import org.apache.druid.query.aggregation.VectorAggregator;
+import org.apache.druid.segment.vector.VectorValueSelector;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+
+public class TDigestNumericVectorizedAggregator implements VectorAggregator

Review Comment:
   can change to `TDigestNumericVectorAggregator` for consistency with other vector agg names



##########
extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestNumericVectorizedAggregator.java:
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.tdigestsketch;
+
+import org.apache.druid.query.aggregation.VectorAggregator;
+import org.apache.druid.segment.vector.VectorValueSelector;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+
+public class TDigestNumericVectorizedAggregator implements VectorAggregator
+{
+  private final TDigestSketchAggregatorHelper innerAggregator;
+  private final VectorValueSelector selector;
+
+  public TDigestNumericVectorizedAggregator(Integer compression, VectorValueSelector selector)
+  {
+    innerAggregator = new TDigestSketchAggregatorHelper(compression == null
+                                                        ? TDigestSketchAggregatorFactory.DEFAULT_COMPRESSION
+                                                        : compression);
+    this.selector = selector;
+  }
+
+  @Override
+  public void init(ByteBuffer buf, int position)
+  {
+    // after this point the histogram is present in the cache
+    innerAggregator.init(buf, position);
+  }
+
+  @Override
+  public void aggregate(ByteBuffer buf, int position, int startRow, int endRow)
+  {
+    double[] vector = selector.getDoubleVector();
+    boolean[] isNull = selector.getNullVector();
+    for (int i = startRow; i < endRow; i++) {
+      double other = toObject(vector, isNull, i);
+      innerAggregator.aggregate(other, buf, position);

Review Comment:
   can we add a new method to the aggregateHelper which directly aggregates on `double` values, since the `MergingDigest` object allows adding `double` values?



##########
extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestSketchAggregatorFactory.java:
##########
@@ -312,4 +318,24 @@ public String toString()
            + ", compression=" + compression
            + "}";
   }
+
+  @Override
+  public boolean canVectorize(ColumnInspector columnInspector)
+  {
+    return true;
+  }
+
+  @Override
+  public VectorAggregator factorizeVector(VectorColumnSelectorFactory selectorFactory)
+  {
+    final ColumnCapabilities capabilities = selectorFactory.getColumnCapabilities(fieldName);
+    ValueType type = capabilities.getType();
+    if (type == ValueType.COMPLEX) {

Review Comment:
   can we have a stricter check for type which checks that the column type is either numeric or `tDigestSketch`?



##########
extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestNumericVectorizedAggregator.java:
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.tdigestsketch;
+
+import org.apache.druid.query.aggregation.VectorAggregator;
+import org.apache.druid.segment.vector.VectorValueSelector;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+
+public class TDigestNumericVectorizedAggregator implements VectorAggregator
+{
+  private final TDigestSketchAggregatorHelper innerAggregator;

Review Comment:
   nit : could be named `aggregatorHelper`



##########
extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestSketchAggregatorHelper.java:
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.tdigestsketch;
+
+import com.tdunning.math.stats.MergingDigest;
+import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
+import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
+import org.apache.druid.java.util.common.IAE;
+
+import java.nio.ByteBuffer;
+import java.util.IdentityHashMap;
+
+public class TDigestSketchAggregatorHelper
+{
+  private final int compression;
+  private final IdentityHashMap<ByteBuffer, Int2ObjectMap<MergingDigest>> sketchCache = new IdentityHashMap();
+
+  public TDigestSketchAggregatorHelper(int compression)
+  {
+    this.compression = compression;
+  }
+
+  public void init(ByteBuffer buffer, int position)
+  {
+    MergingDigest emptyDigest = new MergingDigest(compression);

Review Comment:
   is it the case that the `MergingDigest` is always an on-heap object and that we use the buffer and position of buffer aggregators just to map that to the on-heap object?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org