You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2020/09/01 17:23:46 UTC

[GitHub] [druid] suneet-s opened a new pull request #10338: WIP vectorized ANY aggregators

suneet-s opened a new pull request #10338:
URL: https://github.com/apache/druid/pull/10338


   ### Description
   
   This patch provides a vectorized implementation of the `ANY` aggregators. Since these aggregators already only read 1 value from the column, the performance gain comparing a vectorized implementation and non-vectorized implementation is expected to be minimal (maybe even slightly slower since we read a batch of values).
   
   However, providing a vectorized implementation allows for queries of the following shape to be vectorized
   
   ```
   SELECT id, ANY_VALUE(name), SUM(number) from datasource group by id
   ```
   where we know there is a 1:1 mapping between id and name
   
   Still WIP - The implementation is rough (lots of copy paste that will be consolidated), and tests are missing. Pushing up the PR so I don't lose track of it locally.
   
   <hr>
   
   This PR has:
   - [ ] been self-reviewed.
      - [ ] using the [concurrency checklist](https://github.com/apache/druid/blob/master/dev/code-review/concurrency.md) (Remove this item if the PR doesn't have any relation to concurrency.)
   - [ ] added documentation for new or modified features or behaviors.
   - [ ] added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
   - [ ] added or updated version, license, or notice information in [licenses.yaml](https://github.com/apache/druid/blob/master/licenses.yaml)
   - [ ] added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
   - [ ] added unit tests or modified existing tests to cover new code paths, ensuring the threshold for [code coverage](https://github.com/apache/druid/blob/master/dev/code-review/code-coverage.md) is met.
   - [x] added integration tests.
   - [ ] been tested in a test Druid cluster.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] suneet-s commented on a change in pull request #10338: Vectorized ANY aggregators

Posted by GitBox <gi...@apache.org>.
suneet-s commented on a change in pull request #10338:
URL: https://github.com/apache/druid/pull/10338#discussion_r483681502



##########
File path: processing/src/main/java/org/apache/druid/query/aggregation/any/NumericAnyVectorAggregator.java
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.any;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.query.aggregation.VectorAggregator;
+import org.apache.druid.segment.vector.VectorValueSelector;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+
+public abstract class NumericAnyVectorAggregator implements VectorAggregator
+{
+  // Rightmost bit for is null check (0 for is null and 1 for not null)
+  // Second rightmost bit for is found check (0 for not found and 1 for found)
+  @VisibleForTesting
+  static final byte BYTE_FLAG_FOUND_MASK = 0x02;
+  private static final byte BYTE_FLAG_NULL_MASK = 0x01;
+  protected static final int FOUND_VALUE_OFFSET = Byte.BYTES;
+
+  private final boolean replaceWithDefault = NullHandling.replaceWithDefault();
+  protected final VectorValueSelector vectorValueSelector;
+
+  public NumericAnyVectorAggregator(VectorValueSelector vectorValueSelector)
+  {
+    this.vectorValueSelector = vectorValueSelector;
+  }
+
+  /**
+   * Initialize the buffer value given the initial offset position within the byte buffer for initialization
+   */
+  abstract void initValue(ByteBuffer buf, int position);
+
+  /**
+   * Place the primitive value in the buffer given the initial offset position within the byte buffer
+   * at which the current aggregate value is stored.
+   */
+  abstract void putValue(ByteBuffer buf, int position, int row);
+
+  /**
+   * Place the primitive null value in the buffer, fiven the initial offset position within the byte buffer

Review comment:
       Removed this function. Realized that it was just putting 0, which is what `initValue` does




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] abhishekagarwal87 commented on a change in pull request #10338: Vectorized ANY aggregators

Posted by GitBox <gi...@apache.org>.
abhishekagarwal87 commented on a change in pull request #10338:
URL: https://github.com/apache/druid/pull/10338#discussion_r483453361



##########
File path: processing/src/main/java/org/apache/druid/query/aggregation/any/NumericAnyVectorAggregator.java
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.any;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.query.aggregation.VectorAggregator;
+import org.apache.druid.segment.vector.VectorValueSelector;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+
+public abstract class NumericAnyVectorAggregator implements VectorAggregator
+{
+  // Rightmost bit for is null check (0 for is null and 1 for not null)
+  // Second rightmost bit for is found check (0 for not found and 1 for found)
+  @VisibleForTesting
+  static final byte BYTE_FLAG_FOUND_MASK = 0x02;
+  private static final byte BYTE_FLAG_NULL_MASK = 0x01;
+  protected static final int FOUND_VALUE_OFFSET = Byte.BYTES;
+
+  private final boolean replaceWithDefault = NullHandling.replaceWithDefault();
+  protected final VectorValueSelector vectorValueSelector;
+
+  public NumericAnyVectorAggregator(VectorValueSelector vectorValueSelector)
+  {
+    this.vectorValueSelector = vectorValueSelector;
+  }
+
+  /**
+   * Initialize the buffer value given the initial offset position within the byte buffer for initialization
+   */
+  abstract void initValue(ByteBuffer buf, int position);
+
+  /**
+   * Place the primitive value in the buffer given the initial offset position within the byte buffer
+   * at which the current aggregate value is stored.
+   */
+  abstract void putValue(ByteBuffer buf, int position, int row);
+
+  /**
+   * Place the primitive null value in the buffer, fiven the initial offset position within the byte buffer
+   * at which the current aggregate value is stored.
+   */
+  abstract void putNonNullValue(ByteBuffer buf, int position, Object value);
+
+  @Override
+  public void init(ByteBuffer buf, int position)
+  {
+    buf.put(position, replaceWithDefault ? NullHandling.IS_NOT_NULL_BYTE : NullHandling.IS_NULL_BYTE);
+    initValue(buf, position);
+  }
+
+  @Override
+  public void aggregate(ByteBuffer buf, int position, int startRow, int endRow)
+  {
+    if ((buf.get(position) & BYTE_FLAG_FOUND_MASK) != BYTE_FLAG_FOUND_MASK) {
+      boolean[] nulls = vectorValueSelector.getNullVector();
+      // check if there are any nulls
+      if (nulls != null && startRow <= nulls.length) {

Review comment:
       `startRow <= nulls.length` is this check required? if so, it should check `startRow < nulls.length`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] suneet-s commented on a change in pull request #10338: WIP vectorized ANY aggregators

Posted by GitBox <gi...@apache.org>.
suneet-s commented on a change in pull request #10338:
URL: https://github.com/apache/druid/pull/10338#discussion_r483319566



##########
File path: processing/src/main/java/org/apache/druid/query/aggregation/any/NumericAnyVectorAggregator.java
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.any;
+
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.query.aggregation.VectorAggregator;
+import org.apache.druid.segment.vector.VectorValueSelector;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+
+public abstract class NumericAnyVectorAggregator implements VectorAggregator
+{
+  // Rightmost bit for is null check (0 for is null and 1 for not null)
+  // Second rightmost bit for is found check (0 for not found and 1 for found)
+  private static final byte BYTE_FLAG_FOUND_MASK = 0x02;
+  private static final byte BYTE_FLAG_NULL_MASK = 0x01;
+  protected static final int FOUND_VALUE_OFFSET = Byte.BYTES;
+
+  private final boolean replaceWithDefault = NullHandling.replaceWithDefault();
+  protected final VectorValueSelector vectorValueSelector;
+
+  public NumericAnyVectorAggregator(VectorValueSelector vectorValueSelector)
+  {
+    this.vectorValueSelector = vectorValueSelector;
+  }
+
+  /**
+   * Initialize the buffer value given the initial offset position within the byte buffer for initialization
+   */
+  abstract void initValue(ByteBuffer buf, int position);
+
+  /**
+   * Place the primitive value in the buffer given the initial offset position within the byte buffer
+   * at which the current aggregate value is stored.
+   * @return true if a value was put on the buffer, false otherwise.
+   */
+  abstract boolean putValue(ByteBuffer buf, int position, int startRow, int endRow);
+
+  /**
+   * Place the primitive null value in the buffer, fiven the initial offset position within the byte buffer
+   * at which the current aggregate value is stored.
+   */
+  abstract void putNonNullValue(ByteBuffer buf, int position, Object value);
+
+  @Override
+  public void init(ByteBuffer buf, int position)
+  {
+    buf.put(position, replaceWithDefault ? NullHandling.IS_NOT_NULL_BYTE : NullHandling.IS_NULL_BYTE);
+    initValue(buf, position);
+  }
+
+  @Override
+  public void aggregate(ByteBuffer buf, int position, int startRow, int endRow)
+  {
+    if ((buf.get(position) & BYTE_FLAG_FOUND_MASK) != BYTE_FLAG_FOUND_MASK) {
+      boolean[] nulls = vectorValueSelector.getNullVector();
+      // check if there are any nulls
+      if (nulls != null && startRow <= nulls.length) {
+        for (int i = startRow; i < endRow; i++) {
+          // And there is actually a null
+          if (nulls[i]) {
+            putValue(buf, position, null);
+            return;
+          }
+        }
+      }
+      // There are no nulls, so put a value from the value selector
+      if (putValue(buf, position, startRow, endRow)) {
+        buf.put(position, (byte) (BYTE_FLAG_FOUND_MASK | NullHandling.IS_NOT_NULL_BYTE));
+      }
+    }
+  }
+
+  @Override
+  public void aggregate(
+      ByteBuffer buf,
+      int numRows,
+      int[] positions,
+      @Nullable int[] rows,
+      int positionOffset
+  )
+  {
+    int prevPosition = -1;
+    @Nullable Object theValue = null;
+    boolean found = false;
+    for (int i = 0; i < numRows; i++) {
+      int position = positions[i] + positionOffset;
+      int row = rows == null ? i : rows[i];
+      // If the aggregate is not found at the position
+      if ((buf.get(position) & BYTE_FLAG_FOUND_MASK) != BYTE_FLAG_FOUND_MASK) {
+        // If there's a value at the previous position, use it in this position.
+        if (prevPosition >= 0 && (found || (buf.get(prevPosition) & BYTE_FLAG_FOUND_MASK) == BYTE_FLAG_FOUND_MASK)) {

Review comment:
       This approach doesn't work, and one of the CalciteQueryTests caught it 🎉 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] abhishekagarwal87 commented on pull request #10338: Vectorized ANY aggregators

Posted by GitBox <gi...@apache.org>.
abhishekagarwal87 commented on pull request #10338:
URL: https://github.com/apache/druid/pull/10338#issuecomment-691457926






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] abhishekagarwal87 commented on a change in pull request #10338: Vectorized ANY aggregators

Posted by GitBox <gi...@apache.org>.
abhishekagarwal87 commented on a change in pull request #10338:
URL: https://github.com/apache/druid/pull/10338#discussion_r483477223



##########
File path: processing/src/main/java/org/apache/druid/query/aggregation/any/NumericAnyVectorAggregator.java
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.any;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.query.aggregation.VectorAggregator;
+import org.apache.druid.segment.vector.VectorValueSelector;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+
+public abstract class NumericAnyVectorAggregator implements VectorAggregator
+{
+  // Rightmost bit for is null check (0 for is null and 1 for not null)
+  // Second rightmost bit for is found check (0 for not found and 1 for found)
+  @VisibleForTesting
+  static final byte BYTE_FLAG_FOUND_MASK = 0x02;
+  private static final byte BYTE_FLAG_NULL_MASK = 0x01;
+  protected static final int FOUND_VALUE_OFFSET = Byte.BYTES;

Review comment:
       they would also be free of any null handling




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] abhishekagarwal87 commented on pull request #10338: Vectorized ANY aggregators

Posted by GitBox <gi...@apache.org>.
abhishekagarwal87 commented on pull request #10338:
URL: https://github.com/apache/druid/pull/10338#issuecomment-688196270


   > I tried that earlier and it failed a test with subqueries - #10338 (comment)
   Oh, I understand now. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] abhishekagarwal87 commented on a change in pull request #10338: Vectorized ANY aggregators

Posted by GitBox <gi...@apache.org>.
abhishekagarwal87 commented on a change in pull request #10338:
URL: https://github.com/apache/druid/pull/10338#discussion_r483476704



##########
File path: processing/src/main/java/org/apache/druid/query/aggregation/any/NumericAnyVectorAggregator.java
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.any;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.query.aggregation.VectorAggregator;
+import org.apache.druid.segment.vector.VectorValueSelector;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+
+public abstract class NumericAnyVectorAggregator implements VectorAggregator
+{
+  // Rightmost bit for is null check (0 for is null and 1 for not null)
+  // Second rightmost bit for is found check (0 for not found and 1 for found)
+  @VisibleForTesting
+  static final byte BYTE_FLAG_FOUND_MASK = 0x02;
+  private static final byte BYTE_FLAG_NULL_MASK = 0x01;
+  protected static final int FOUND_VALUE_OFFSET = Byte.BYTES;

Review comment:
       again nitpicking: but this can be made private. In inherited classes, instead of passing `position`, you can pass `position + FOUND_VALUE_OFFSET` so that those methods are free of any logic related to offset. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] abhishekagarwal87 commented on a change in pull request #10338: Vectorized ANY aggregators

Posted by GitBox <gi...@apache.org>.
abhishekagarwal87 commented on a change in pull request #10338:
URL: https://github.com/apache/druid/pull/10338#discussion_r483460415



##########
File path: processing/src/main/java/org/apache/druid/query/aggregation/any/NumericAnyVectorAggregator.java
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.any;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.query.aggregation.VectorAggregator;
+import org.apache.druid.segment.vector.VectorValueSelector;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+
+public abstract class NumericAnyVectorAggregator implements VectorAggregator
+{
+  // Rightmost bit for is null check (0 for is null and 1 for not null)
+  // Second rightmost bit for is found check (0 for not found and 1 for found)
+  @VisibleForTesting
+  static final byte BYTE_FLAG_FOUND_MASK = 0x02;
+  private static final byte BYTE_FLAG_NULL_MASK = 0x01;
+  protected static final int FOUND_VALUE_OFFSET = Byte.BYTES;
+
+  private final boolean replaceWithDefault = NullHandling.replaceWithDefault();
+  protected final VectorValueSelector vectorValueSelector;
+
+  public NumericAnyVectorAggregator(VectorValueSelector vectorValueSelector)
+  {
+    this.vectorValueSelector = vectorValueSelector;
+  }
+
+  /**
+   * Initialize the buffer value given the initial offset position within the byte buffer for initialization
+   */
+  abstract void initValue(ByteBuffer buf, int position);
+
+  /**
+   * Place the primitive value in the buffer given the initial offset position within the byte buffer
+   * at which the current aggregate value is stored.
+   */
+  abstract void putValue(ByteBuffer buf, int position, int row);
+
+  /**
+   * Place the primitive null value in the buffer, fiven the initial offset position within the byte buffer
+   * at which the current aggregate value is stored.
+   */
+  abstract void putNonNullValue(ByteBuffer buf, int position, Object value);
+
+  @Override
+  public void init(ByteBuffer buf, int position)
+  {
+    buf.put(position, replaceWithDefault ? NullHandling.IS_NOT_NULL_BYTE : NullHandling.IS_NULL_BYTE);
+    initValue(buf, position);
+  }
+
+  @Override
+  public void aggregate(ByteBuffer buf, int position, int startRow, int endRow)
+  {
+    if ((buf.get(position) & BYTE_FLAG_FOUND_MASK) != BYTE_FLAG_FOUND_MASK) {
+      boolean[] nulls = vectorValueSelector.getNullVector();
+      // check if there are any nulls
+      if (nulls != null && startRow <= nulls.length) {
+        for (int i = startRow; i < endRow; i++) {
+          // And there is actually a null
+          if (nulls[i]) {
+            putNull(buf, position);
+            return;
+          }
+        }
+      }
+      // There are no nulls, so put a value from the value selector
+      putValue(buf, position, startRow);
+      buf.put(position, (byte) (BYTE_FLAG_FOUND_MASK | NullHandling.IS_NOT_NULL_BYTE));
+    }
+  }
+
+  @Override
+  public void aggregate(

Review comment:
       are there unit tests for this method? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] abhishekagarwal87 commented on a change in pull request #10338: Vectorized ANY aggregators

Posted by GitBox <gi...@apache.org>.
abhishekagarwal87 commented on a change in pull request #10338:
URL: https://github.com/apache/druid/pull/10338#discussion_r483475430



##########
File path: processing/src/main/java/org/apache/druid/query/aggregation/any/NumericAnyVectorAggregator.java
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.any;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.query.aggregation.VectorAggregator;
+import org.apache.druid.segment.vector.VectorValueSelector;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+
+public abstract class NumericAnyVectorAggregator implements VectorAggregator
+{
+  // Rightmost bit for is null check (0 for is null and 1 for not null)
+  // Second rightmost bit for is found check (0 for not found and 1 for found)
+  @VisibleForTesting
+  static final byte BYTE_FLAG_FOUND_MASK = 0x02;
+  private static final byte BYTE_FLAG_NULL_MASK = 0x01;
+  protected static final int FOUND_VALUE_OFFSET = Byte.BYTES;
+
+  private final boolean replaceWithDefault = NullHandling.replaceWithDefault();
+  protected final VectorValueSelector vectorValueSelector;
+
+  public NumericAnyVectorAggregator(VectorValueSelector vectorValueSelector)
+  {
+    this.vectorValueSelector = vectorValueSelector;
+  }
+
+  /**
+   * Initialize the buffer value given the initial offset position within the byte buffer for initialization
+   */
+  abstract void initValue(ByteBuffer buf, int position);
+
+  /**
+   * Place the primitive value in the buffer given the initial offset position within the byte buffer
+   * at which the current aggregate value is stored.
+   */
+  abstract void putValue(ByteBuffer buf, int position, int row);

Review comment:
       nit: how about `putNonNullRow`? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] abhishekagarwal87 commented on a change in pull request #10338: Vectorized ANY aggregators

Posted by GitBox <gi...@apache.org>.
abhishekagarwal87 commented on a change in pull request #10338:
URL: https://github.com/apache/druid/pull/10338#discussion_r483476704



##########
File path: processing/src/main/java/org/apache/druid/query/aggregation/any/NumericAnyVectorAggregator.java
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.any;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.query.aggregation.VectorAggregator;
+import org.apache.druid.segment.vector.VectorValueSelector;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+
+public abstract class NumericAnyVectorAggregator implements VectorAggregator
+{
+  // Rightmost bit for is null check (0 for is null and 1 for not null)
+  // Second rightmost bit for is found check (0 for not found and 1 for found)
+  @VisibleForTesting
+  static final byte BYTE_FLAG_FOUND_MASK = 0x02;
+  private static final byte BYTE_FLAG_NULL_MASK = 0x01;
+  protected static final int FOUND_VALUE_OFFSET = Byte.BYTES;

Review comment:
       again nitpicking: but this can be made private. In protected methods, instead of passing `position`, you can pass `position + FOUND_VALUE_OFFSET` so that those methods are free of any logic related to offset. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] suneet-s commented on a change in pull request #10338: Vectorized ANY aggregators

Posted by GitBox <gi...@apache.org>.
suneet-s commented on a change in pull request #10338:
URL: https://github.com/apache/druid/pull/10338#discussion_r483674764



##########
File path: processing/src/main/java/org/apache/druid/query/aggregation/any/NumericAnyVectorAggregator.java
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.any;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.query.aggregation.VectorAggregator;
+import org.apache.druid.segment.vector.VectorValueSelector;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+
+public abstract class NumericAnyVectorAggregator implements VectorAggregator
+{
+  // Rightmost bit for is null check (0 for is null and 1 for not null)
+  // Second rightmost bit for is found check (0 for not found and 1 for found)
+  @VisibleForTesting
+  static final byte BYTE_FLAG_FOUND_MASK = 0x02;
+  private static final byte BYTE_FLAG_NULL_MASK = 0x01;
+  protected static final int FOUND_VALUE_OFFSET = Byte.BYTES;
+
+  private final boolean replaceWithDefault = NullHandling.replaceWithDefault();
+  protected final VectorValueSelector vectorValueSelector;
+
+  public NumericAnyVectorAggregator(VectorValueSelector vectorValueSelector)
+  {
+    this.vectorValueSelector = vectorValueSelector;
+  }
+
+  /**
+   * Initialize the buffer value given the initial offset position within the byte buffer for initialization
+   */
+  abstract void initValue(ByteBuffer buf, int position);
+
+  /**
+   * Place the primitive value in the buffer given the initial offset position within the byte buffer
+   * at which the current aggregate value is stored.
+   */
+  abstract void putValue(ByteBuffer buf, int position, int row);
+
+  /**
+   * Place the primitive null value in the buffer, fiven the initial offset position within the byte buffer
+   * at which the current aggregate value is stored.
+   */
+  abstract void putNonNullValue(ByteBuffer buf, int position, Object value);
+
+  @Override
+  public void init(ByteBuffer buf, int position)
+  {
+    buf.put(position, replaceWithDefault ? NullHandling.IS_NOT_NULL_BYTE : NullHandling.IS_NULL_BYTE);
+    initValue(buf, position);
+  }
+
+  @Override
+  public void aggregate(ByteBuffer buf, int position, int startRow, int endRow)
+  {
+    if ((buf.get(position) & BYTE_FLAG_FOUND_MASK) != BYTE_FLAG_FOUND_MASK) {
+      boolean[] nulls = vectorValueSelector.getNullVector();
+      // check if there are any nulls
+      if (nulls != null && startRow <= nulls.length) {

Review comment:
       Fixed.

##########
File path: processing/src/main/java/org/apache/druid/query/aggregation/any/NumericAnyVectorAggregator.java
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.any;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.query.aggregation.VectorAggregator;
+import org.apache.druid.segment.vector.VectorValueSelector;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+
+public abstract class NumericAnyVectorAggregator implements VectorAggregator
+{
+  // Rightmost bit for is null check (0 for is null and 1 for not null)
+  // Second rightmost bit for is found check (0 for not found and 1 for found)
+  @VisibleForTesting
+  static final byte BYTE_FLAG_FOUND_MASK = 0x02;
+  private static final byte BYTE_FLAG_NULL_MASK = 0x01;
+  protected static final int FOUND_VALUE_OFFSET = Byte.BYTES;
+
+  private final boolean replaceWithDefault = NullHandling.replaceWithDefault();
+  protected final VectorValueSelector vectorValueSelector;
+
+  public NumericAnyVectorAggregator(VectorValueSelector vectorValueSelector)
+  {
+    this.vectorValueSelector = vectorValueSelector;
+  }
+
+  /**
+   * Initialize the buffer value given the initial offset position within the byte buffer for initialization
+   */
+  abstract void initValue(ByteBuffer buf, int position);
+
+  /**
+   * Place the primitive value in the buffer given the initial offset position within the byte buffer
+   * at which the current aggregate value is stored.
+   */
+  abstract void putValue(ByteBuffer buf, int position, int row);
+
+  /**
+   * Place the primitive null value in the buffer, fiven the initial offset position within the byte buffer
+   * at which the current aggregate value is stored.
+   */
+  abstract void putNonNullValue(ByteBuffer buf, int position, Object value);
+
+  @Override
+  public void init(ByteBuffer buf, int position)
+  {
+    buf.put(position, replaceWithDefault ? NullHandling.IS_NOT_NULL_BYTE : NullHandling.IS_NULL_BYTE);
+    initValue(buf, position);
+  }
+
+  @Override
+  public void aggregate(ByteBuffer buf, int position, int startRow, int endRow)
+  {
+    if ((buf.get(position) & BYTE_FLAG_FOUND_MASK) != BYTE_FLAG_FOUND_MASK) {
+      boolean[] nulls = vectorValueSelector.getNullVector();
+      // check if there are any nulls
+      if (nulls != null && startRow <= nulls.length) {
+        for (int i = startRow; i < endRow; i++) {
+          // And there is actually a null
+          if (nulls[i]) {
+            putNull(buf, position);
+            return;
+          }
+        }
+      }
+      // There are no nulls, so put a value from the value selector
+      putValue(buf, position, startRow);
+      buf.put(position, (byte) (BYTE_FLAG_FOUND_MASK | NullHandling.IS_NOT_NULL_BYTE));
+    }
+  }
+
+  @Override
+  public void aggregate(
+      ByteBuffer buf,
+      int numRows,
+      int[] positions,
+      @Nullable int[] rows,
+      int positionOffset
+  )
+  {
+    for (int i = 0; i < numRows; i++) {
+      int position = positions[i] + positionOffset;
+      int row = rows == null ? i : rows[i];
+      aggregate(buf, position, row, row);

Review comment:
       yes 😬 

##########
File path: processing/src/main/java/org/apache/druid/query/aggregation/any/StringAnyAggregatorFactory.java
##########
@@ -77,6 +81,33 @@ public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
     return new StringAnyBufferAggregator(metricFactory.makeColumnValueSelector(fieldName), maxStringBytes);
   }
 
+  @Override
+  public StringAnyVectorAggregator factorizeVector(VectorColumnSelectorFactory selectorFactory)
+  {
+
+    ColumnCapabilities capabilities = selectorFactory.getColumnCapabilities(fieldName);
+    if (capabilities == null || capabilities.hasMultipleValues().isMaybeTrue()) {
+      return new StringAnyVectorAggregator(
+          null,
+          selectorFactory.makeMultiValueDimensionSelector(DefaultDimensionSpec.of(fieldName)),
+          maxStringBytes
+      );
+    } else {
+      return new StringAnyVectorAggregator(
+          selectorFactory.makeSingleValueDimensionSelector(DefaultDimensionSpec.of(fieldName)),
+          null,
+          maxStringBytes
+      );
+    }
+  }
+
+  @Override
+  public boolean canVectorize(ColumnInspector columnInspector)
+  {
+    ColumnCapabilities capabilities = columnInspector.getColumnCapabilities(fieldName);
+    return capabilities == null || capabilities.getType() == ValueType.STRING;

Review comment:
       ```
    /**
      * Returns capabilities of a particular column, if known. May be null if the column doesn't exist, or if
      * the column does exist but the capabilities are unknown. The latter is possible with dynamically discovered
      * columns.
      *
      * Note that StorageAdapters are representations of "real" segments, so they are not aware of any virtual columns
      * that may be involved in a query. In general, query engines should instead use the method
      * {@link ColumnSelectorFactory#getColumnCapabilities(String)}, which returns capabilities for virtual columns as
      * well.
      *
      * @param column column name
      *
      * @return capabilities, or null
      */
     @Override
     @Nullable
     ColumnCapabilities getColumnCapabilities(String column);
   ```
   
   I found this explanation in `StorageAdapter#getColumnCapabilities`

##########
File path: processing/src/main/java/org/apache/druid/query/aggregation/any/NumericAnyVectorAggregator.java
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.any;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.query.aggregation.VectorAggregator;
+import org.apache.druid.segment.vector.VectorValueSelector;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+
+public abstract class NumericAnyVectorAggregator implements VectorAggregator
+{
+  // Rightmost bit for is null check (0 for is null and 1 for not null)
+  // Second rightmost bit for is found check (0 for not found and 1 for found)
+  @VisibleForTesting
+  static final byte BYTE_FLAG_FOUND_MASK = 0x02;
+  private static final byte BYTE_FLAG_NULL_MASK = 0x01;
+  protected static final int FOUND_VALUE_OFFSET = Byte.BYTES;
+
+  private final boolean replaceWithDefault = NullHandling.replaceWithDefault();
+  protected final VectorValueSelector vectorValueSelector;
+
+  public NumericAnyVectorAggregator(VectorValueSelector vectorValueSelector)
+  {
+    this.vectorValueSelector = vectorValueSelector;
+  }
+
+  /**
+   * Initialize the buffer value given the initial offset position within the byte buffer for initialization
+   */
+  abstract void initValue(ByteBuffer buf, int position);
+
+  /**
+   * Place the primitive value in the buffer given the initial offset position within the byte buffer
+   * at which the current aggregate value is stored.
+   */
+  abstract void putValue(ByteBuffer buf, int position, int row);

Review comment:
       Refactored to `boolean putAnyValueFromRow(ByteBuffer buf, int position, int startRow, int endRow)`

##########
File path: processing/src/main/java/org/apache/druid/query/aggregation/any/NumericAnyVectorAggregator.java
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.any;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.query.aggregation.VectorAggregator;
+import org.apache.druid.segment.vector.VectorValueSelector;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+
+public abstract class NumericAnyVectorAggregator implements VectorAggregator
+{
+  // Rightmost bit for is null check (0 for is null and 1 for not null)
+  // Second rightmost bit for is found check (0 for not found and 1 for found)
+  @VisibleForTesting
+  static final byte BYTE_FLAG_FOUND_MASK = 0x02;
+  private static final byte BYTE_FLAG_NULL_MASK = 0x01;
+  protected static final int FOUND_VALUE_OFFSET = Byte.BYTES;
+
+  private final boolean replaceWithDefault = NullHandling.replaceWithDefault();
+  protected final VectorValueSelector vectorValueSelector;
+
+  public NumericAnyVectorAggregator(VectorValueSelector vectorValueSelector)
+  {
+    this.vectorValueSelector = vectorValueSelector;
+  }
+
+  /**
+   * Initialize the buffer value given the initial offset position within the byte buffer for initialization
+   */
+  abstract void initValue(ByteBuffer buf, int position);
+
+  /**
+   * Place the primitive value in the buffer given the initial offset position within the byte buffer
+   * at which the current aggregate value is stored.
+   */
+  abstract void putValue(ByteBuffer buf, int position, int row);
+
+  /**
+   * Place the primitive null value in the buffer, fiven the initial offset position within the byte buffer
+   * at which the current aggregate value is stored.
+   */
+  abstract void putNonNullValue(ByteBuffer buf, int position, Object value);
+
+  @Override
+  public void init(ByteBuffer buf, int position)
+  {
+    buf.put(position, replaceWithDefault ? NullHandling.IS_NOT_NULL_BYTE : NullHandling.IS_NULL_BYTE);
+    initValue(buf, position);
+  }
+
+  @Override
+  public void aggregate(ByteBuffer buf, int position, int startRow, int endRow)
+  {
+    if ((buf.get(position) & BYTE_FLAG_FOUND_MASK) != BYTE_FLAG_FOUND_MASK) {
+      boolean[] nulls = vectorValueSelector.getNullVector();
+      // check if there are any nulls
+      if (nulls != null && startRow <= nulls.length) {
+        for (int i = startRow; i < endRow; i++) {
+          // And there is actually a null
+          if (nulls[i]) {
+            putNull(buf, position);
+            return;
+          }
+        }
+      }
+      // There are no nulls, so put a value from the value selector
+      putValue(buf, position, startRow);
+      buf.put(position, (byte) (BYTE_FLAG_FOUND_MASK | NullHandling.IS_NOT_NULL_BYTE));
+    }
+  }
+
+  @Override
+  public void aggregate(
+      ByteBuffer buf,
+      int numRows,
+      int[] positions,
+      @Nullable int[] rows,
+      int positionOffset
+  )
+  {
+    for (int i = 0; i < numRows; i++) {
+      int position = positions[i] + positionOffset;
+      int row = rows == null ? i : rows[i];
+      aggregate(buf, position, row, row);

Review comment:
       I tried that earlier and it failed a test with subqueries - https://github.com/apache/druid/pull/10338#discussion_r483319566

##########
File path: processing/src/main/java/org/apache/druid/query/aggregation/any/NumericAnyVectorAggregator.java
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.any;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.query.aggregation.VectorAggregator;
+import org.apache.druid.segment.vector.VectorValueSelector;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+
+public abstract class NumericAnyVectorAggregator implements VectorAggregator
+{
+  // Rightmost bit for is null check (0 for is null and 1 for not null)
+  // Second rightmost bit for is found check (0 for not found and 1 for found)
+  @VisibleForTesting
+  static final byte BYTE_FLAG_FOUND_MASK = 0x02;
+  private static final byte BYTE_FLAG_NULL_MASK = 0x01;
+  protected static final int FOUND_VALUE_OFFSET = Byte.BYTES;
+
+  private final boolean replaceWithDefault = NullHandling.replaceWithDefault();
+  protected final VectorValueSelector vectorValueSelector;
+
+  public NumericAnyVectorAggregator(VectorValueSelector vectorValueSelector)
+  {
+    this.vectorValueSelector = vectorValueSelector;
+  }
+
+  /**
+   * Initialize the buffer value given the initial offset position within the byte buffer for initialization
+   */
+  abstract void initValue(ByteBuffer buf, int position);
+
+  /**
+   * Place the primitive value in the buffer given the initial offset position within the byte buffer
+   * at which the current aggregate value is stored.
+   */
+  abstract void putValue(ByteBuffer buf, int position, int row);
+
+  /**
+   * Place the primitive null value in the buffer, fiven the initial offset position within the byte buffer
+   * at which the current aggregate value is stored.
+   */
+  abstract void putNonNullValue(ByteBuffer buf, int position, Object value);
+
+  @Override
+  public void init(ByteBuffer buf, int position)
+  {
+    buf.put(position, replaceWithDefault ? NullHandling.IS_NOT_NULL_BYTE : NullHandling.IS_NULL_BYTE);
+    initValue(buf, position);
+  }
+
+  @Override
+  public void aggregate(ByteBuffer buf, int position, int startRow, int endRow)
+  {
+    if ((buf.get(position) & BYTE_FLAG_FOUND_MASK) != BYTE_FLAG_FOUND_MASK) {
+      boolean[] nulls = vectorValueSelector.getNullVector();
+      // check if there are any nulls
+      if (nulls != null && startRow <= nulls.length) {
+        for (int i = startRow; i < endRow; i++) {
+          // And there is actually a null
+          if (nulls[i]) {
+            putNull(buf, position);
+            return;
+          }
+        }
+      }
+      // There are no nulls, so put a value from the value selector
+      putValue(buf, position, startRow);
+      buf.put(position, (byte) (BYTE_FLAG_FOUND_MASK | NullHandling.IS_NOT_NULL_BYTE));
+    }
+  }
+
+  @Override
+  public void aggregate(

Review comment:
       added now. Was in progress when I pushed the last patch up.

##########
File path: processing/src/main/java/org/apache/druid/query/aggregation/any/NumericAnyVectorAggregator.java
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.any;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.query.aggregation.VectorAggregator;
+import org.apache.druid.segment.vector.VectorValueSelector;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+
+public abstract class NumericAnyVectorAggregator implements VectorAggregator
+{
+  // Rightmost bit for is null check (0 for is null and 1 for not null)
+  // Second rightmost bit for is found check (0 for not found and 1 for found)
+  @VisibleForTesting
+  static final byte BYTE_FLAG_FOUND_MASK = 0x02;
+  private static final byte BYTE_FLAG_NULL_MASK = 0x01;
+  protected static final int FOUND_VALUE_OFFSET = Byte.BYTES;
+
+  private final boolean replaceWithDefault = NullHandling.replaceWithDefault();
+  protected final VectorValueSelector vectorValueSelector;
+
+  public NumericAnyVectorAggregator(VectorValueSelector vectorValueSelector)
+  {
+    this.vectorValueSelector = vectorValueSelector;
+  }
+
+  /**
+   * Initialize the buffer value given the initial offset position within the byte buffer for initialization
+   */
+  abstract void initValue(ByteBuffer buf, int position);
+
+  /**
+   * Place the primitive value in the buffer given the initial offset position within the byte buffer
+   * at which the current aggregate value is stored.
+   */
+  abstract void putValue(ByteBuffer buf, int position, int row);
+
+  /**
+   * Place the primitive null value in the buffer, fiven the initial offset position within the byte buffer

Review comment:
       updated the function to just `putZero`

##########
File path: processing/src/main/java/org/apache/druid/query/aggregation/any/NumericAnyVectorAggregator.java
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.any;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.query.aggregation.VectorAggregator;
+import org.apache.druid.segment.vector.VectorValueSelector;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+
+public abstract class NumericAnyVectorAggregator implements VectorAggregator
+{
+  // Rightmost bit for is null check (0 for is null and 1 for not null)
+  // Second rightmost bit for is found check (0 for not found and 1 for found)
+  @VisibleForTesting
+  static final byte BYTE_FLAG_FOUND_MASK = 0x02;
+  private static final byte BYTE_FLAG_NULL_MASK = 0x01;
+  protected static final int FOUND_VALUE_OFFSET = Byte.BYTES;

Review comment:
       good suggestion! Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] suneet-s commented on a change in pull request #10338: Vectorized ANY aggregators

Posted by GitBox <gi...@apache.org>.
suneet-s commented on a change in pull request #10338:
URL: https://github.com/apache/druid/pull/10338#discussion_r488280699



##########
File path: processing/src/main/java/org/apache/druid/query/aggregation/any/NumericNilVectorAggregator.java
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.any;
+
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.query.aggregation.VectorAggregator;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+
+/**
+ * A vector aggregator that returns the default numeric value.
+ */
+public abstract class NumericNilVectorAggregator implements VectorAggregator

Review comment:
       I decided not to extend `NoopVectorAggregator` since it's constructor was private, and I didn't want to think through what the right inheritance model should be. I did take your suggestion on just instantiating 1 singleton for each numeric type to reduce the boiler plate code. New patch incoming...




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] abhishekagarwal87 commented on a change in pull request #10338: Vectorized ANY aggregators

Posted by GitBox <gi...@apache.org>.
abhishekagarwal87 commented on a change in pull request #10338:
URL: https://github.com/apache/druid/pull/10338#discussion_r484315121



##########
File path: processing/src/test/java/org/apache/druid/query/aggregation/any/DoubleAnyVectorAggregatorTest.java
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.any;
+
+import org.apache.druid.segment.vector.VectorValueSelector;
+import org.apache.druid.testing.InitializedNullHandlingTest;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.mockito.Mockito.spy;
+
+@RunWith(MockitoJUnitRunner.class)
+public class DoubleAnyVectorAggregatorTest extends InitializedNullHandlingTest
+{
+  private static final int NULL_POSITION = 32;
+  private static final int POSITION = 2;
+  private static final double EPSILON = 1e-15;
+  private static final double[] VALUES = new double[]{7.8d, 11, 23.67, 60, 123};
+
+  private ByteBuffer buf;
+  @Mock
+  private VectorValueSelector selector;
+
+  private DoubleAnyVectorAggregator target;
+
+  @Before
+  public void setUp()
+  {
+    byte[] randomBytes = new byte[128];
+    ThreadLocalRandom.current().nextBytes(randomBytes);
+    buf = ByteBuffer.wrap(randomBytes);
+    Mockito.doReturn(VALUES).when(selector).getDoubleVector();
+
+    target = spy(new DoubleAnyVectorAggregator(selector));
+    Mockito.when(target.isValueNull(buf, NULL_POSITION)).thenReturn(true);
+    Mockito.when(target.isValueNull(buf, POSITION)).thenReturn(false);
+  }
+
+  @Test
+  public void initValueShouldInitZero()
+  {
+    target.initValue(buf, POSITION);
+    Assert.assertEquals(0, buf.getDouble(POSITION), EPSILON);
+  }
+
+  @Test
+  public void getAtPositionIsNullShouldReturnNull()
+  {
+    Assert.assertNull(target.get(buf, NULL_POSITION));
+  }
+
+  @Test
+  public void getAtPositionShouldReturnValue()
+  {
+    buf.putDouble(POSITION + 1, VALUES[3]);

Review comment:
       this got me thinking about how did this test pass. There was no flag set in the buffer and the target still returned the value. I realized that in `NumericAnyVectorAggregator`, `non-null` is the default than `null` 
   ```
   boolean isValueNull(ByteBuffer buf, int position)
     {
       return (buf.get(position) & BYTE_FLAG_NULL_MASK) == NullHandling.IS_NULL_BYTE;
     }
   ```
   It should be fine since init is always called. maybe the tests should do the same as well. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] abhishekagarwal87 commented on a change in pull request #10338: Vectorized ANY aggregators

Posted by GitBox <gi...@apache.org>.
abhishekagarwal87 commented on a change in pull request #10338:
URL: https://github.com/apache/druid/pull/10338#discussion_r484318256



##########
File path: processing/src/test/java/org/apache/druid/query/aggregation/any/DoubleAnyVectorAggregatorTest.java
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.any;
+
+import org.apache.druid.segment.vector.VectorValueSelector;
+import org.apache.druid.testing.InitializedNullHandlingTest;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.mockito.Mockito.spy;
+
+@RunWith(MockitoJUnitRunner.class)
+public class DoubleAnyVectorAggregatorTest extends InitializedNullHandlingTest
+{
+  private static final int NULL_POSITION = 32;
+  private static final int POSITION = 2;
+  private static final double EPSILON = 1e-15;
+  private static final double[] VALUES = new double[]{7.8d, 11, 23.67, 60, 123};
+
+  private ByteBuffer buf;
+  @Mock
+  private VectorValueSelector selector;
+
+  private DoubleAnyVectorAggregator target;
+
+  @Before
+  public void setUp()
+  {
+    byte[] randomBytes = new byte[128];
+    ThreadLocalRandom.current().nextBytes(randomBytes);
+    buf = ByteBuffer.wrap(randomBytes);
+    Mockito.doReturn(VALUES).when(selector).getDoubleVector();
+
+    target = spy(new DoubleAnyVectorAggregator(selector));
+    Mockito.when(target.isValueNull(buf, NULL_POSITION)).thenReturn(true);
+    Mockito.when(target.isValueNull(buf, POSITION)).thenReturn(false);
+  }
+
+  @Test
+  public void initValueShouldInitZero()
+  {
+    target.initValue(buf, POSITION);
+    Assert.assertEquals(0, buf.getDouble(POSITION), EPSILON);
+  }
+
+  @Test
+  public void getAtPositionIsNullShouldReturnNull()
+  {
+    Assert.assertNull(target.get(buf, NULL_POSITION));
+  }
+
+  @Test
+  public void getAtPositionShouldReturnValue()
+  {
+    buf.putDouble(POSITION + 1, VALUES[3]);

Review comment:
       `Mockito.when(target.isValueNull(buf, POSITION)).thenReturn(false);` should be taking care of it. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] abhishekagarwal87 commented on a change in pull request #10338: Vectorized ANY aggregators

Posted by GitBox <gi...@apache.org>.
abhishekagarwal87 commented on a change in pull request #10338:
URL: https://github.com/apache/druid/pull/10338#discussion_r483478720



##########
File path: processing/src/main/java/org/apache/druid/query/aggregation/any/StringAnyAggregatorFactory.java
##########
@@ -77,6 +81,33 @@ public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
     return new StringAnyBufferAggregator(metricFactory.makeColumnValueSelector(fieldName), maxStringBytes);
   }
 
+  @Override
+  public StringAnyVectorAggregator factorizeVector(VectorColumnSelectorFactory selectorFactory)
+  {
+
+    ColumnCapabilities capabilities = selectorFactory.getColumnCapabilities(fieldName);
+    if (capabilities == null || capabilities.hasMultipleValues().isMaybeTrue()) {
+      return new StringAnyVectorAggregator(
+          null,
+          selectorFactory.makeMultiValueDimensionSelector(DefaultDimensionSpec.of(fieldName)),
+          maxStringBytes
+      );
+    } else {
+      return new StringAnyVectorAggregator(
+          selectorFactory.makeSingleValueDimensionSelector(DefaultDimensionSpec.of(fieldName)),
+          null,
+          maxStringBytes
+      );
+    }
+  }
+
+  @Override
+  public boolean canVectorize(ColumnInspector columnInspector)
+  {
+    ColumnCapabilities capabilities = columnInspector.getColumnCapabilities(fieldName);
+    return capabilities == null || capabilities.getType() == ValueType.STRING;

Review comment:
       out of curiosity, when can `capabilities` be null?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] suneet-s commented on pull request #10338: Vectorized ANY aggregators

Posted by GitBox <gi...@apache.org>.
suneet-s commented on pull request #10338:
URL: https://github.com/apache/druid/pull/10338#issuecomment-686829622


   Ready for review, but I'm still adding tests


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] abhishekagarwal87 commented on a change in pull request #10338: Vectorized ANY aggregators

Posted by GitBox <gi...@apache.org>.
abhishekagarwal87 commented on a change in pull request #10338:
URL: https://github.com/apache/druid/pull/10338#discussion_r483458719



##########
File path: processing/src/main/java/org/apache/druid/query/aggregation/any/NumericAnyVectorAggregator.java
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.any;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.query.aggregation.VectorAggregator;
+import org.apache.druid.segment.vector.VectorValueSelector;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+
+public abstract class NumericAnyVectorAggregator implements VectorAggregator
+{
+  // Rightmost bit for is null check (0 for is null and 1 for not null)
+  // Second rightmost bit for is found check (0 for not found and 1 for found)
+  @VisibleForTesting
+  static final byte BYTE_FLAG_FOUND_MASK = 0x02;
+  private static final byte BYTE_FLAG_NULL_MASK = 0x01;
+  protected static final int FOUND_VALUE_OFFSET = Byte.BYTES;
+
+  private final boolean replaceWithDefault = NullHandling.replaceWithDefault();
+  protected final VectorValueSelector vectorValueSelector;
+
+  public NumericAnyVectorAggregator(VectorValueSelector vectorValueSelector)
+  {
+    this.vectorValueSelector = vectorValueSelector;
+  }
+
+  /**
+   * Initialize the buffer value given the initial offset position within the byte buffer for initialization
+   */
+  abstract void initValue(ByteBuffer buf, int position);
+
+  /**
+   * Place the primitive value in the buffer given the initial offset position within the byte buffer
+   * at which the current aggregate value is stored.
+   */
+  abstract void putValue(ByteBuffer buf, int position, int row);
+
+  /**
+   * Place the primitive null value in the buffer, fiven the initial offset position within the byte buffer
+   * at which the current aggregate value is stored.
+   */
+  abstract void putNonNullValue(ByteBuffer buf, int position, Object value);
+
+  @Override
+  public void init(ByteBuffer buf, int position)
+  {
+    buf.put(position, replaceWithDefault ? NullHandling.IS_NOT_NULL_BYTE : NullHandling.IS_NULL_BYTE);
+    initValue(buf, position);
+  }
+
+  @Override
+  public void aggregate(ByteBuffer buf, int position, int startRow, int endRow)
+  {
+    if ((buf.get(position) & BYTE_FLAG_FOUND_MASK) != BYTE_FLAG_FOUND_MASK) {
+      boolean[] nulls = vectorValueSelector.getNullVector();
+      // check if there are any nulls
+      if (nulls != null && startRow <= nulls.length) {
+        for (int i = startRow; i < endRow; i++) {
+          // And there is actually a null
+          if (nulls[i]) {
+            putNull(buf, position);
+            return;
+          }
+        }
+      }
+      // There are no nulls, so put a value from the value selector
+      putValue(buf, position, startRow);
+      buf.put(position, (byte) (BYTE_FLAG_FOUND_MASK | NullHandling.IS_NOT_NULL_BYTE));
+    }
+  }
+
+  @Override
+  public void aggregate(
+      ByteBuffer buf,
+      int numRows,
+      int[] positions,
+      @Nullable int[] rows,
+      int positionOffset
+  )
+  {
+    for (int i = 0; i < numRows; i++) {
+      int position = positions[i] + positionOffset;
+      int row = rows == null ? i : rows[i];
+      aggregate(buf, position, row, row);

Review comment:
       should it not be `aggregate(buf, position, row, row + 1)` ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] abhishekagarwal87 commented on a change in pull request #10338: Vectorized ANY aggregators

Posted by GitBox <gi...@apache.org>.
abhishekagarwal87 commented on a change in pull request #10338:
URL: https://github.com/apache/druid/pull/10338#discussion_r485040542



##########
File path: processing/src/test/java/org/apache/druid/query/aggregation/any/DoubleAnyVectorAggregatorTest.java
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.any;
+
+import org.apache.druid.segment.vector.VectorValueSelector;
+import org.apache.druid.testing.InitializedNullHandlingTest;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.mockito.Mockito.spy;
+
+@RunWith(MockitoJUnitRunner.class)
+public class DoubleAnyVectorAggregatorTest extends InitializedNullHandlingTest
+{
+  private static final int NULL_POSITION = 32;
+  private static final int POSITION = 2;
+  private static final double EPSILON = 1e-15;
+  private static final double[] VALUES = new double[]{7.8d, 11, 23.67, 60, 123};
+
+  private ByteBuffer buf;
+  @Mock
+  private VectorValueSelector selector;
+
+  private DoubleAnyVectorAggregator target;
+
+  @Before
+  public void setUp()
+  {
+    byte[] randomBytes = new byte[128];
+    ThreadLocalRandom.current().nextBytes(randomBytes);
+    buf = ByteBuffer.wrap(randomBytes);
+    Mockito.doReturn(VALUES).when(selector).getDoubleVector();
+
+    target = spy(new DoubleAnyVectorAggregator(selector));
+    Mockito.when(target.isValueNull(buf, NULL_POSITION)).thenReturn(true);
+    Mockito.when(target.isValueNull(buf, POSITION)).thenReturn(false);
+  }
+
+  @Test
+  public void initValueShouldInitZero()
+  {
+    target.initValue(buf, POSITION);
+    Assert.assertEquals(0, buf.getDouble(POSITION), EPSILON);
+  }
+
+  @Test
+  public void getAtPositionIsNullShouldReturnNull()
+  {
+    Assert.assertNull(target.get(buf, NULL_POSITION));
+  }
+
+  @Test
+  public void getAtPositionShouldReturnValue()
+  {
+    buf.putDouble(POSITION + 1, VALUES[3]);
+    Assert.assertEquals(VALUES[3], (double) target.get(buf, POSITION), EPSILON);
+  }
+
+  @Test
+  public void putValueShouldAddToBuffer()
+  {
+    Assert.assertTrue(target.putAnyValueFromRow(buf, POSITION, 2, 3));
+    Assert.assertEquals(VALUES[2], buf.getDouble(POSITION), EPSILON);
+  }
+
+  @Test
+  public void putValueStartAfterEndShouldNotAddToBuffer()
+  {
+    Assert.assertFalse(target.putAnyValueFromRow(buf, POSITION, 2, 2));
+    Assert.assertNotEquals(VALUES[2], buf.getDouble(POSITION));

Review comment:
       Looks good to me otherwise.  




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] abhishekagarwal87 commented on a change in pull request #10338: Vectorized ANY aggregators

Posted by GitBox <gi...@apache.org>.
abhishekagarwal87 commented on a change in pull request #10338:
URL: https://github.com/apache/druid/pull/10338#discussion_r484316088



##########
File path: processing/src/test/java/org/apache/druid/query/aggregation/any/DoubleAnyVectorAggregatorTest.java
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.any;
+
+import org.apache.druid.segment.vector.VectorValueSelector;
+import org.apache.druid.testing.InitializedNullHandlingTest;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.mockito.Mockito.spy;
+
+@RunWith(MockitoJUnitRunner.class)
+public class DoubleAnyVectorAggregatorTest extends InitializedNullHandlingTest
+{
+  private static final int NULL_POSITION = 32;
+  private static final int POSITION = 2;
+  private static final double EPSILON = 1e-15;
+  private static final double[] VALUES = new double[]{7.8d, 11, 23.67, 60, 123};
+
+  private ByteBuffer buf;
+  @Mock
+  private VectorValueSelector selector;
+
+  private DoubleAnyVectorAggregator target;
+
+  @Before
+  public void setUp()
+  {
+    byte[] randomBytes = new byte[128];
+    ThreadLocalRandom.current().nextBytes(randomBytes);
+    buf = ByteBuffer.wrap(randomBytes);
+    Mockito.doReturn(VALUES).when(selector).getDoubleVector();
+
+    target = spy(new DoubleAnyVectorAggregator(selector));
+    Mockito.when(target.isValueNull(buf, NULL_POSITION)).thenReturn(true);
+    Mockito.when(target.isValueNull(buf, POSITION)).thenReturn(false);
+  }
+
+  @Test
+  public void initValueShouldInitZero()
+  {
+    target.initValue(buf, POSITION);
+    Assert.assertEquals(0, buf.getDouble(POSITION), EPSILON);
+  }
+
+  @Test
+  public void getAtPositionIsNullShouldReturnNull()
+  {
+    Assert.assertNull(target.get(buf, NULL_POSITION));
+  }
+
+  @Test
+  public void getAtPositionShouldReturnValue()
+  {
+    buf.putDouble(POSITION + 1, VALUES[3]);
+    Assert.assertEquals(VALUES[3], (double) target.get(buf, POSITION), EPSILON);
+  }
+
+  @Test
+  public void putValueShouldAddToBuffer()
+  {
+    Assert.assertTrue(target.putAnyValueFromRow(buf, POSITION, 2, 3));
+    Assert.assertEquals(VALUES[2], buf.getDouble(POSITION), EPSILON);
+  }
+
+  @Test
+  public void putValueStartAfterEndShouldNotAddToBuffer()
+  {
+    Assert.assertFalse(target.putAnyValueFromRow(buf, POSITION, 2, 2));
+    Assert.assertNotEquals(VALUES[2], buf.getDouble(POSITION));

Review comment:
       we should be passing the delta in `assertNotEquals` methods too. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] lgtm-com[bot] commented on pull request #10338: WIP vectorized ANY aggregators

Posted by GitBox <gi...@apache.org>.
lgtm-com[bot] commented on pull request #10338:
URL: https://github.com/apache/druid/pull/10338#issuecomment-685059113


   This pull request **introduces 5 alerts** when merging 49e4f0ced8e81a243afeaca6d79048a7c36bf30f into e81a9df507e8f5c30cf5eebdc6d29cedf34626ba - [view on LGTM.com](https://lgtm.com/projects/g/apache/druid/rev/pr-09f8b4add47200e775dab06bf1b17b1287787269)
   
   **new alerts:**
   
   * 5 for Array index out of bounds


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] suneet-s merged pull request #10338: Vectorized ANY aggregators

Posted by GitBox <gi...@apache.org>.
suneet-s merged pull request #10338:
URL: https://github.com/apache/druid/pull/10338


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] abhishekagarwal87 commented on a change in pull request #10338: Vectorized ANY aggregators

Posted by GitBox <gi...@apache.org>.
abhishekagarwal87 commented on a change in pull request #10338:
URL: https://github.com/apache/druid/pull/10338#discussion_r483719357



##########
File path: processing/src/main/java/org/apache/druid/query/aggregation/any/StringAnyAggregatorFactory.java
##########
@@ -77,6 +81,33 @@ public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
     return new StringAnyBufferAggregator(metricFactory.makeColumnValueSelector(fieldName), maxStringBytes);
   }
 
+  @Override
+  public StringAnyVectorAggregator factorizeVector(VectorColumnSelectorFactory selectorFactory)
+  {
+
+    ColumnCapabilities capabilities = selectorFactory.getColumnCapabilities(fieldName);
+    if (capabilities == null || capabilities.hasMultipleValues().isMaybeTrue()) {
+      return new StringAnyVectorAggregator(
+          null,
+          selectorFactory.makeMultiValueDimensionSelector(DefaultDimensionSpec.of(fieldName)),
+          maxStringBytes
+      );
+    } else {
+      return new StringAnyVectorAggregator(
+          selectorFactory.makeSingleValueDimensionSelector(DefaultDimensionSpec.of(fieldName)),
+          null,
+          maxStringBytes
+      );
+    }
+  }
+
+  @Override
+  public boolean canVectorize(ColumnInspector columnInspector)
+  {
+    ColumnCapabilities capabilities = columnInspector.getColumnCapabilities(fieldName);
+    return capabilities == null || capabilities.getType() == ValueType.STRING;

Review comment:
       Thanks 👍 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] suneet-s commented on a change in pull request #10338: Vectorized ANY aggregators

Posted by GitBox <gi...@apache.org>.
suneet-s commented on a change in pull request #10338:
URL: https://github.com/apache/druid/pull/10338#discussion_r485020245



##########
File path: processing/src/main/java/org/apache/druid/query/aggregation/any/FloatAnyVectorAggregator.java
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.any;
+
+import org.apache.druid.segment.vector.VectorValueSelector;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Vectorized implementation of the {@link FloatAnyBufferAggregator}
+ */
+public class FloatAnyVectorAggregator extends NumericAnyVectorAggregator
+{
+  public FloatAnyVectorAggregator(VectorValueSelector vectorValueSelector)
+  {
+    super(vectorValueSelector);
+  }
+
+  @Override
+  void initValue(ByteBuffer buf, int position)
+  {
+    buf.putFloat(position, 0F);
+  }
+
+  @Override
+  boolean putAnyValueFromRow(ByteBuffer buf, int position, int startRow, int endRow)
+  {
+    float[] values = vectorValueSelector.getFloatVector();
+    boolean isRowsWithinIndex = startRow < endRow && startRow < values.length;

Review comment:
       I wasn't sure if it's needed or not. The other aggregators have these checks implicitly within the for loops I think




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] suneet-s commented on a change in pull request #10338: Vectorized ANY aggregators

Posted by GitBox <gi...@apache.org>.
suneet-s commented on a change in pull request #10338:
URL: https://github.com/apache/druid/pull/10338#discussion_r485023760



##########
File path: processing/src/test/java/org/apache/druid/query/aggregation/any/DoubleAnyVectorAggregatorTest.java
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.any;
+
+import org.apache.druid.segment.vector.VectorValueSelector;
+import org.apache.druid.testing.InitializedNullHandlingTest;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.mockito.Mockito.spy;
+
+@RunWith(MockitoJUnitRunner.class)
+public class DoubleAnyVectorAggregatorTest extends InitializedNullHandlingTest
+{
+  private static final int NULL_POSITION = 32;
+  private static final int POSITION = 2;
+  private static final double EPSILON = 1e-15;
+  private static final double[] VALUES = new double[]{7.8d, 11, 23.67, 60, 123};
+
+  private ByteBuffer buf;
+  @Mock
+  private VectorValueSelector selector;
+
+  private DoubleAnyVectorAggregator target;
+
+  @Before
+  public void setUp()
+  {
+    byte[] randomBytes = new byte[128];
+    ThreadLocalRandom.current().nextBytes(randomBytes);
+    buf = ByteBuffer.wrap(randomBytes);
+    Mockito.doReturn(VALUES).when(selector).getDoubleVector();
+
+    target = spy(new DoubleAnyVectorAggregator(selector));
+    Mockito.when(target.isValueNull(buf, NULL_POSITION)).thenReturn(true);
+    Mockito.when(target.isValueNull(buf, POSITION)).thenReturn(false);
+  }
+
+  @Test
+  public void initValueShouldInitZero()
+  {
+    target.initValue(buf, POSITION);
+    Assert.assertEquals(0, buf.getDouble(POSITION), EPSILON);
+  }
+
+  @Test
+  public void getAtPositionIsNullShouldReturnNull()
+  {
+    Assert.assertNull(target.get(buf, NULL_POSITION));
+  }
+
+  @Test
+  public void getAtPositionShouldReturnValue()
+  {
+    buf.putDouble(POSITION + 1, VALUES[3]);
+    Assert.assertEquals(VALUES[3], (double) target.get(buf, POSITION), EPSILON);
+  }
+
+  @Test
+  public void putValueShouldAddToBuffer()
+  {
+    Assert.assertTrue(target.putAnyValueFromRow(buf, POSITION, 2, 3));
+    Assert.assertEquals(VALUES[2], buf.getDouble(POSITION), EPSILON);
+  }
+
+  @Test
+  public void putValueStartAfterEndShouldNotAddToBuffer()
+  {
+    Assert.assertFalse(target.putAnyValueFromRow(buf, POSITION, 2, 2));
+    Assert.assertNotEquals(VALUES[2], buf.getDouble(POSITION));

Review comment:
       will add in the next patch. waiting to see if any other changes are needed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] abhishekagarwal87 commented on a change in pull request #10338: Vectorized ANY aggregators

Posted by GitBox <gi...@apache.org>.
abhishekagarwal87 commented on a change in pull request #10338:
URL: https://github.com/apache/druid/pull/10338#discussion_r483467418



##########
File path: processing/src/main/java/org/apache/druid/query/aggregation/any/NumericAnyVectorAggregator.java
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.any;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.query.aggregation.VectorAggregator;
+import org.apache.druid.segment.vector.VectorValueSelector;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+
+public abstract class NumericAnyVectorAggregator implements VectorAggregator
+{
+  // Rightmost bit for is null check (0 for is null and 1 for not null)
+  // Second rightmost bit for is found check (0 for not found and 1 for found)
+  @VisibleForTesting
+  static final byte BYTE_FLAG_FOUND_MASK = 0x02;
+  private static final byte BYTE_FLAG_NULL_MASK = 0x01;
+  protected static final int FOUND_VALUE_OFFSET = Byte.BYTES;
+
+  private final boolean replaceWithDefault = NullHandling.replaceWithDefault();
+  protected final VectorValueSelector vectorValueSelector;
+
+  public NumericAnyVectorAggregator(VectorValueSelector vectorValueSelector)
+  {
+    this.vectorValueSelector = vectorValueSelector;
+  }
+
+  /**
+   * Initialize the buffer value given the initial offset position within the byte buffer for initialization
+   */
+  abstract void initValue(ByteBuffer buf, int position);
+
+  /**
+   * Place the primitive value in the buffer given the initial offset position within the byte buffer
+   * at which the current aggregate value is stored.
+   */
+  abstract void putValue(ByteBuffer buf, int position, int row);
+
+  /**
+   * Place the primitive null value in the buffer, fiven the initial offset position within the byte buffer
+   * at which the current aggregate value is stored.
+   */
+  abstract void putNonNullValue(ByteBuffer buf, int position, Object value);
+
+  @Override
+  public void init(ByteBuffer buf, int position)
+  {
+    buf.put(position, replaceWithDefault ? NullHandling.IS_NOT_NULL_BYTE : NullHandling.IS_NULL_BYTE);
+    initValue(buf, position);
+  }
+
+  @Override
+  public void aggregate(ByteBuffer buf, int position, int startRow, int endRow)
+  {
+    if ((buf.get(position) & BYTE_FLAG_FOUND_MASK) != BYTE_FLAG_FOUND_MASK) {
+      boolean[] nulls = vectorValueSelector.getNullVector();
+      // check if there are any nulls
+      if (nulls != null && startRow <= nulls.length) {
+        for (int i = startRow; i < endRow; i++) {
+          // And there is actually a null
+          if (nulls[i]) {
+            putNull(buf, position);
+            return;
+          }
+        }
+      }
+      // There are no nulls, so put a value from the value selector
+      putValue(buf, position, startRow);
+      buf.put(position, (byte) (BYTE_FLAG_FOUND_MASK | NullHandling.IS_NOT_NULL_BYTE));
+    }
+  }
+
+  @Override
+  public void aggregate(
+      ByteBuffer buf,
+      int numRows,
+      int[] positions,
+      @Nullable int[] rows,
+      int positionOffset
+  )
+  {
+    for (int i = 0; i < numRows; i++) {
+      int position = positions[i] + positionOffset;
+      int row = rows == null ? i : rows[i];
+      aggregate(buf, position, row, row);
+    }
+  }
+
+  @Override
+  public void close()
+  {
+    // No resources to cleanup.
+  }
+
+  protected boolean isValueNull(ByteBuffer buf, int position)
+  {
+    return (buf.get(position) & BYTE_FLAG_NULL_MASK) == NullHandling.IS_NULL_BYTE;
+  }
+
+  private void putNull(ByteBuffer buf, int position)
+  {
+    if (!replaceWithDefault) {
+      buf.put(position, (byte) (BYTE_FLAG_FOUND_MASK | NullHandling.IS_NULL_BYTE));

Review comment:
       may be wrong here but shouldn't it be `buf.put(position, (byte) (BYTE_FLAG_FOUND_MASK | BYTE_FLAG_NULL_MASK | NullHandling.IS_NULL_BYTE))`
   I see the same thing being done in buffer aggregator but not able to see how that is right. 
   There are no tests as well. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] abhishekagarwal87 commented on a change in pull request #10338: Vectorized ANY aggregators

Posted by GitBox <gi...@apache.org>.
abhishekagarwal87 commented on a change in pull request #10338:
URL: https://github.com/apache/druid/pull/10338#discussion_r483453937



##########
File path: processing/src/main/java/org/apache/druid/query/aggregation/any/NumericAnyVectorAggregator.java
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.any;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.query.aggregation.VectorAggregator;
+import org.apache.druid.segment.vector.VectorValueSelector;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+
+public abstract class NumericAnyVectorAggregator implements VectorAggregator
+{
+  // Rightmost bit for is null check (0 for is null and 1 for not null)
+  // Second rightmost bit for is found check (0 for not found and 1 for found)
+  @VisibleForTesting
+  static final byte BYTE_FLAG_FOUND_MASK = 0x02;
+  private static final byte BYTE_FLAG_NULL_MASK = 0x01;
+  protected static final int FOUND_VALUE_OFFSET = Byte.BYTES;
+
+  private final boolean replaceWithDefault = NullHandling.replaceWithDefault();
+  protected final VectorValueSelector vectorValueSelector;
+
+  public NumericAnyVectorAggregator(VectorValueSelector vectorValueSelector)
+  {
+    this.vectorValueSelector = vectorValueSelector;
+  }
+
+  /**
+   * Initialize the buffer value given the initial offset position within the byte buffer for initialization
+   */
+  abstract void initValue(ByteBuffer buf, int position);
+
+  /**
+   * Place the primitive value in the buffer given the initial offset position within the byte buffer
+   * at which the current aggregate value is stored.
+   */
+  abstract void putValue(ByteBuffer buf, int position, int row);
+
+  /**
+   * Place the primitive null value in the buffer, fiven the initial offset position within the byte buffer
+   * at which the current aggregate value is stored.
+   */
+  abstract void putNonNullValue(ByteBuffer buf, int position, Object value);
+
+  @Override
+  public void init(ByteBuffer buf, int position)
+  {
+    buf.put(position, replaceWithDefault ? NullHandling.IS_NOT_NULL_BYTE : NullHandling.IS_NULL_BYTE);
+    initValue(buf, position);
+  }
+
+  @Override
+  public void aggregate(ByteBuffer buf, int position, int startRow, int endRow)
+  {
+    if ((buf.get(position) & BYTE_FLAG_FOUND_MASK) != BYTE_FLAG_FOUND_MASK) {
+      boolean[] nulls = vectorValueSelector.getNullVector();
+      // check if there are any nulls
+      if (nulls != null && startRow <= nulls.length) {

Review comment:
       or `endRow <= nulls.length`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] abhishekagarwal87 commented on pull request #10338: Vectorized ANY aggregators

Posted by GitBox <gi...@apache.org>.
abhishekagarwal87 commented on pull request #10338:
URL: https://github.com/apache/druid/pull/10338#issuecomment-691457926






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] clintropolis commented on a change in pull request #10338: Vectorized ANY aggregators

Posted by GitBox <gi...@apache.org>.
clintropolis commented on a change in pull request #10338:
URL: https://github.com/apache/druid/pull/10338#discussion_r488262030



##########
File path: processing/src/main/java/org/apache/druid/query/aggregation/any/NumericNilVectorAggregator.java
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.any;
+
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.query.aggregation.VectorAggregator;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+
+/**
+ * A vector aggregator that returns the default numeric value.
+ */
+public abstract class NumericNilVectorAggregator implements VectorAggregator

Review comment:
       nit: can extend `NoopVectorAggregator` to reduce boilerplate. I also don't think you need separate classes for each type, since the number is an object anyway, can just instantiate each singleton using `NullHandling.defaultLongValue()` etc.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] lgtm-com[bot] commented on pull request #10338: Vectorized ANY aggregators

Posted by GitBox <gi...@apache.org>.
lgtm-com[bot] commented on pull request #10338:
URL: https://github.com/apache/druid/pull/10338#issuecomment-686848183


   This pull request **introduces 2 alerts** when merging 8ba5de97b5c4c8862ff9e36a6d801e5c482cd6be into 3fc8bc0701938b532282b6b50398c0ee6503a517 - [view on LGTM.com](https://lgtm.com/projects/g/apache/druid/rev/pr-bca83f32b5e58c583f8b5bcc078994c65bb07e99)
   
   **new alerts:**
   
   * 2 for Array index out of bounds


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] abhishekagarwal87 commented on a change in pull request #10338: Vectorized ANY aggregators

Posted by GitBox <gi...@apache.org>.
abhishekagarwal87 commented on a change in pull request #10338:
URL: https://github.com/apache/druid/pull/10338#discussion_r483467418



##########
File path: processing/src/main/java/org/apache/druid/query/aggregation/any/NumericAnyVectorAggregator.java
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.any;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.query.aggregation.VectorAggregator;
+import org.apache.druid.segment.vector.VectorValueSelector;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+
+public abstract class NumericAnyVectorAggregator implements VectorAggregator
+{
+  // Rightmost bit for is null check (0 for is null and 1 for not null)
+  // Second rightmost bit for is found check (0 for not found and 1 for found)
+  @VisibleForTesting
+  static final byte BYTE_FLAG_FOUND_MASK = 0x02;
+  private static final byte BYTE_FLAG_NULL_MASK = 0x01;
+  protected static final int FOUND_VALUE_OFFSET = Byte.BYTES;
+
+  private final boolean replaceWithDefault = NullHandling.replaceWithDefault();
+  protected final VectorValueSelector vectorValueSelector;
+
+  public NumericAnyVectorAggregator(VectorValueSelector vectorValueSelector)
+  {
+    this.vectorValueSelector = vectorValueSelector;
+  }
+
+  /**
+   * Initialize the buffer value given the initial offset position within the byte buffer for initialization
+   */
+  abstract void initValue(ByteBuffer buf, int position);
+
+  /**
+   * Place the primitive value in the buffer given the initial offset position within the byte buffer
+   * at which the current aggregate value is stored.
+   */
+  abstract void putValue(ByteBuffer buf, int position, int row);
+
+  /**
+   * Place the primitive null value in the buffer, fiven the initial offset position within the byte buffer
+   * at which the current aggregate value is stored.
+   */
+  abstract void putNonNullValue(ByteBuffer buf, int position, Object value);
+
+  @Override
+  public void init(ByteBuffer buf, int position)
+  {
+    buf.put(position, replaceWithDefault ? NullHandling.IS_NOT_NULL_BYTE : NullHandling.IS_NULL_BYTE);
+    initValue(buf, position);
+  }
+
+  @Override
+  public void aggregate(ByteBuffer buf, int position, int startRow, int endRow)
+  {
+    if ((buf.get(position) & BYTE_FLAG_FOUND_MASK) != BYTE_FLAG_FOUND_MASK) {
+      boolean[] nulls = vectorValueSelector.getNullVector();
+      // check if there are any nulls
+      if (nulls != null && startRow <= nulls.length) {
+        for (int i = startRow; i < endRow; i++) {
+          // And there is actually a null
+          if (nulls[i]) {
+            putNull(buf, position);
+            return;
+          }
+        }
+      }
+      // There are no nulls, so put a value from the value selector
+      putValue(buf, position, startRow);
+      buf.put(position, (byte) (BYTE_FLAG_FOUND_MASK | NullHandling.IS_NOT_NULL_BYTE));
+    }
+  }
+
+  @Override
+  public void aggregate(
+      ByteBuffer buf,
+      int numRows,
+      int[] positions,
+      @Nullable int[] rows,
+      int positionOffset
+  )
+  {
+    for (int i = 0; i < numRows; i++) {
+      int position = positions[i] + positionOffset;
+      int row = rows == null ? i : rows[i];
+      aggregate(buf, position, row, row);
+    }
+  }
+
+  @Override
+  public void close()
+  {
+    // No resources to cleanup.
+  }
+
+  protected boolean isValueNull(ByteBuffer buf, int position)
+  {
+    return (buf.get(position) & BYTE_FLAG_NULL_MASK) == NullHandling.IS_NULL_BYTE;
+  }
+
+  private void putNull(ByteBuffer buf, int position)
+  {
+    if (!replaceWithDefault) {
+      buf.put(position, (byte) (BYTE_FLAG_FOUND_MASK | NullHandling.IS_NULL_BYTE));

Review comment:
       may be wrong here but shouldn't it be `buf.put(position, (byte) (BYTE_FLAG_FOUND_MASK | BYTE_FLAG_NULL_MASK | NullHandling.IS_NULL_BYTE))`
   I see the same thing being done in buffer aggregator but not able to see how that is right. 
   There are no tests as well. 

##########
File path: processing/src/main/java/org/apache/druid/query/aggregation/any/NumericAnyVectorAggregator.java
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.any;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.query.aggregation.VectorAggregator;
+import org.apache.druid.segment.vector.VectorValueSelector;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+
+public abstract class NumericAnyVectorAggregator implements VectorAggregator
+{
+  // Rightmost bit for is null check (0 for is null and 1 for not null)
+  // Second rightmost bit for is found check (0 for not found and 1 for found)
+  @VisibleForTesting
+  static final byte BYTE_FLAG_FOUND_MASK = 0x02;
+  private static final byte BYTE_FLAG_NULL_MASK = 0x01;
+  protected static final int FOUND_VALUE_OFFSET = Byte.BYTES;
+
+  private final boolean replaceWithDefault = NullHandling.replaceWithDefault();
+  protected final VectorValueSelector vectorValueSelector;
+
+  public NumericAnyVectorAggregator(VectorValueSelector vectorValueSelector)
+  {
+    this.vectorValueSelector = vectorValueSelector;
+  }
+
+  /**
+   * Initialize the buffer value given the initial offset position within the byte buffer for initialization
+   */
+  abstract void initValue(ByteBuffer buf, int position);
+
+  /**
+   * Place the primitive value in the buffer given the initial offset position within the byte buffer
+   * at which the current aggregate value is stored.
+   */
+  abstract void putValue(ByteBuffer buf, int position, int row);
+
+  /**
+   * Place the primitive null value in the buffer, fiven the initial offset position within the byte buffer
+   * at which the current aggregate value is stored.
+   */
+  abstract void putNonNullValue(ByteBuffer buf, int position, Object value);
+
+  @Override
+  public void init(ByteBuffer buf, int position)
+  {
+    buf.put(position, replaceWithDefault ? NullHandling.IS_NOT_NULL_BYTE : NullHandling.IS_NULL_BYTE);
+    initValue(buf, position);
+  }
+
+  @Override
+  public void aggregate(ByteBuffer buf, int position, int startRow, int endRow)
+  {
+    if ((buf.get(position) & BYTE_FLAG_FOUND_MASK) != BYTE_FLAG_FOUND_MASK) {
+      boolean[] nulls = vectorValueSelector.getNullVector();
+      // check if there are any nulls
+      if (nulls != null && startRow <= nulls.length) {
+        for (int i = startRow; i < endRow; i++) {
+          // And there is actually a null
+          if (nulls[i]) {
+            putNull(buf, position);
+            return;
+          }
+        }
+      }
+      // There are no nulls, so put a value from the value selector
+      putValue(buf, position, startRow);
+      buf.put(position, (byte) (BYTE_FLAG_FOUND_MASK | NullHandling.IS_NOT_NULL_BYTE));
+    }
+  }
+
+  @Override
+  public void aggregate(
+      ByteBuffer buf,
+      int numRows,
+      int[] positions,
+      @Nullable int[] rows,
+      int positionOffset
+  )
+  {
+    for (int i = 0; i < numRows; i++) {
+      int position = positions[i] + positionOffset;
+      int row = rows == null ? i : rows[i];
+      aggregate(buf, position, row, row);
+    }
+  }
+
+  @Override
+  public void close()
+  {
+    // No resources to cleanup.
+  }
+
+  protected boolean isValueNull(ByteBuffer buf, int position)
+  {
+    return (buf.get(position) & BYTE_FLAG_NULL_MASK) == NullHandling.IS_NULL_BYTE;
+  }
+
+  private void putNull(ByteBuffer buf, int position)
+  {
+    if (!replaceWithDefault) {
+      buf.put(position, (byte) (BYTE_FLAG_FOUND_MASK | NullHandling.IS_NULL_BYTE));

Review comment:
       never mind. I see why it will work. NullHandling.IS_NULL_BYTE is `(byte) 1`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] abhishekagarwal87 commented on a change in pull request #10338: Vectorized ANY aggregators

Posted by GitBox <gi...@apache.org>.
abhishekagarwal87 commented on a change in pull request #10338:
URL: https://github.com/apache/druid/pull/10338#discussion_r483462463



##########
File path: processing/src/main/java/org/apache/druid/query/aggregation/any/NumericAnyVectorAggregator.java
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.any;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.query.aggregation.VectorAggregator;
+import org.apache.druid.segment.vector.VectorValueSelector;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+
+public abstract class NumericAnyVectorAggregator implements VectorAggregator
+{
+  // Rightmost bit for is null check (0 for is null and 1 for not null)
+  // Second rightmost bit for is found check (0 for not found and 1 for found)
+  @VisibleForTesting
+  static final byte BYTE_FLAG_FOUND_MASK = 0x02;
+  private static final byte BYTE_FLAG_NULL_MASK = 0x01;
+  protected static final int FOUND_VALUE_OFFSET = Byte.BYTES;
+
+  private final boolean replaceWithDefault = NullHandling.replaceWithDefault();
+  protected final VectorValueSelector vectorValueSelector;
+
+  public NumericAnyVectorAggregator(VectorValueSelector vectorValueSelector)
+  {
+    this.vectorValueSelector = vectorValueSelector;
+  }
+
+  /**
+   * Initialize the buffer value given the initial offset position within the byte buffer for initialization
+   */
+  abstract void initValue(ByteBuffer buf, int position);
+
+  /**
+   * Place the primitive value in the buffer given the initial offset position within the byte buffer
+   * at which the current aggregate value is stored.
+   */
+  abstract void putValue(ByteBuffer buf, int position, int row);
+
+  /**
+   * Place the primitive null value in the buffer, fiven the initial offset position within the byte buffer
+   * at which the current aggregate value is stored.
+   */
+  abstract void putNonNullValue(ByteBuffer buf, int position, Object value);
+
+  @Override
+  public void init(ByteBuffer buf, int position)
+  {
+    buf.put(position, replaceWithDefault ? NullHandling.IS_NOT_NULL_BYTE : NullHandling.IS_NULL_BYTE);
+    initValue(buf, position);
+  }
+
+  @Override
+  public void aggregate(ByteBuffer buf, int position, int startRow, int endRow)
+  {
+    if ((buf.get(position) & BYTE_FLAG_FOUND_MASK) != BYTE_FLAG_FOUND_MASK) {
+      boolean[] nulls = vectorValueSelector.getNullVector();
+      // check if there are any nulls
+      if (nulls != null && startRow <= nulls.length) {
+        for (int i = startRow; i < endRow; i++) {
+          // And there is actually a null
+          if (nulls[i]) {
+            putNull(buf, position);
+            return;
+          }
+        }
+      }
+      // There are no nulls, so put a value from the value selector
+      putValue(buf, position, startRow);
+      buf.put(position, (byte) (BYTE_FLAG_FOUND_MASK | NullHandling.IS_NOT_NULL_BYTE));
+    }
+  }
+
+  @Override
+  public void aggregate(
+      ByteBuffer buf,
+      int numRows,
+      int[] positions,
+      @Nullable int[] rows,
+      int positionOffset
+  )
+  {
+    for (int i = 0; i < numRows; i++) {
+      int position = positions[i] + positionOffset;
+      int row = rows == null ? i : rows[i];
+      aggregate(buf, position, row, row);

Review comment:
       I think it may be more efficient to not call `aggregate`. E.g. if a value is already found, we still iterate through the loop which could have been avoided. what do you think? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] suneet-s commented on a change in pull request #10338: WIP vectorized ANY aggregators

Posted by GitBox <gi...@apache.org>.
suneet-s commented on a change in pull request #10338:
URL: https://github.com/apache/druid/pull/10338#discussion_r481311373



##########
File path: processing/src/main/java/org/apache/druid/query/aggregation/any/StringAnyAggregatorFactory.java
##########
@@ -77,6 +82,37 @@ public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
     return new StringAnyBufferAggregator(metricFactory.makeColumnValueSelector(fieldName), maxStringBytes);
   }
 
+  @Override
+  public VectorAggregator factorizeVector(VectorColumnSelectorFactory selectorFactory)
+  {
+
+    ColumnCapabilities capabilities = selectorFactory.getColumnCapabilities(fieldName);
+    if (capabilities == null || capabilities.hasMultipleValues().isFalse()) {
+      return new StringAnyVectorAggregator(
+          selectorFactory.makeSingleValueDimensionSelector(DefaultDimensionSpec.of(fieldName)),
+          null,
+          maxStringBytes
+      );
+    } else {
+      return new StringAnyVectorAggregator(
+          null,
+          selectorFactory.makeMultiValueDimensionSelector(DefaultDimensionSpec.of(fieldName)),
+          maxStringBytes
+      );
+
+    }
+  }
+
+  @Override
+  public boolean canVectorize(ColumnInspector columnInspector)
+  {
+    if (fieldName != null) {
+      ColumnCapabilities capabilities = columnInspector.getColumnCapabilities(fieldName);
+      return capabilities == null || capabilities.getType() == ValueType.STRING;

Review comment:
       should any column type be allowed?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] abhishekagarwal87 commented on a change in pull request #10338: Vectorized ANY aggregators

Posted by GitBox <gi...@apache.org>.
abhishekagarwal87 commented on a change in pull request #10338:
URL: https://github.com/apache/druid/pull/10338#discussion_r483462463



##########
File path: processing/src/main/java/org/apache/druid/query/aggregation/any/NumericAnyVectorAggregator.java
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.any;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.query.aggregation.VectorAggregator;
+import org.apache.druid.segment.vector.VectorValueSelector;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+
+public abstract class NumericAnyVectorAggregator implements VectorAggregator
+{
+  // Rightmost bit for is null check (0 for is null and 1 for not null)
+  // Second rightmost bit for is found check (0 for not found and 1 for found)
+  @VisibleForTesting
+  static final byte BYTE_FLAG_FOUND_MASK = 0x02;
+  private static final byte BYTE_FLAG_NULL_MASK = 0x01;
+  protected static final int FOUND_VALUE_OFFSET = Byte.BYTES;
+
+  private final boolean replaceWithDefault = NullHandling.replaceWithDefault();
+  protected final VectorValueSelector vectorValueSelector;
+
+  public NumericAnyVectorAggregator(VectorValueSelector vectorValueSelector)
+  {
+    this.vectorValueSelector = vectorValueSelector;
+  }
+
+  /**
+   * Initialize the buffer value given the initial offset position within the byte buffer for initialization
+   */
+  abstract void initValue(ByteBuffer buf, int position);
+
+  /**
+   * Place the primitive value in the buffer given the initial offset position within the byte buffer
+   * at which the current aggregate value is stored.
+   */
+  abstract void putValue(ByteBuffer buf, int position, int row);
+
+  /**
+   * Place the primitive null value in the buffer, fiven the initial offset position within the byte buffer
+   * at which the current aggregate value is stored.
+   */
+  abstract void putNonNullValue(ByteBuffer buf, int position, Object value);
+
+  @Override
+  public void init(ByteBuffer buf, int position)
+  {
+    buf.put(position, replaceWithDefault ? NullHandling.IS_NOT_NULL_BYTE : NullHandling.IS_NULL_BYTE);
+    initValue(buf, position);
+  }
+
+  @Override
+  public void aggregate(ByteBuffer buf, int position, int startRow, int endRow)
+  {
+    if ((buf.get(position) & BYTE_FLAG_FOUND_MASK) != BYTE_FLAG_FOUND_MASK) {
+      boolean[] nulls = vectorValueSelector.getNullVector();
+      // check if there are any nulls
+      if (nulls != null && startRow <= nulls.length) {
+        for (int i = startRow; i < endRow; i++) {
+          // And there is actually a null
+          if (nulls[i]) {
+            putNull(buf, position);
+            return;
+          }
+        }
+      }
+      // There are no nulls, so put a value from the value selector
+      putValue(buf, position, startRow);
+      buf.put(position, (byte) (BYTE_FLAG_FOUND_MASK | NullHandling.IS_NOT_NULL_BYTE));
+    }
+  }
+
+  @Override
+  public void aggregate(
+      ByteBuffer buf,
+      int numRows,
+      int[] positions,
+      @Nullable int[] rows,
+      int positionOffset
+  )
+  {
+    for (int i = 0; i < numRows; i++) {
+      int position = positions[i] + positionOffset;
+      int row = rows == null ? i : rows[i];
+      aggregate(buf, position, row, row);

Review comment:
       I think it may be more efficient to not call `aggregate`. E.g. if a value is already found, we still iterate through the loop which could have been avoided. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] abhishekagarwal87 commented on a change in pull request #10338: Vectorized ANY aggregators

Posted by GitBox <gi...@apache.org>.
abhishekagarwal87 commented on a change in pull request #10338:
URL: https://github.com/apache/druid/pull/10338#discussion_r483470098



##########
File path: processing/src/main/java/org/apache/druid/query/aggregation/any/NumericAnyVectorAggregator.java
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.any;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.query.aggregation.VectorAggregator;
+import org.apache.druid.segment.vector.VectorValueSelector;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+
+public abstract class NumericAnyVectorAggregator implements VectorAggregator
+{
+  // Rightmost bit for is null check (0 for is null and 1 for not null)
+  // Second rightmost bit for is found check (0 for not found and 1 for found)
+  @VisibleForTesting
+  static final byte BYTE_FLAG_FOUND_MASK = 0x02;
+  private static final byte BYTE_FLAG_NULL_MASK = 0x01;
+  protected static final int FOUND_VALUE_OFFSET = Byte.BYTES;
+
+  private final boolean replaceWithDefault = NullHandling.replaceWithDefault();
+  protected final VectorValueSelector vectorValueSelector;
+
+  public NumericAnyVectorAggregator(VectorValueSelector vectorValueSelector)
+  {
+    this.vectorValueSelector = vectorValueSelector;
+  }
+
+  /**
+   * Initialize the buffer value given the initial offset position within the byte buffer for initialization
+   */
+  abstract void initValue(ByteBuffer buf, int position);
+
+  /**
+   * Place the primitive value in the buffer given the initial offset position within the byte buffer
+   * at which the current aggregate value is stored.
+   */
+  abstract void putValue(ByteBuffer buf, int position, int row);
+
+  /**
+   * Place the primitive null value in the buffer, fiven the initial offset position within the byte buffer
+   * at which the current aggregate value is stored.
+   */
+  abstract void putNonNullValue(ByteBuffer buf, int position, Object value);
+
+  @Override
+  public void init(ByteBuffer buf, int position)
+  {
+    buf.put(position, replaceWithDefault ? NullHandling.IS_NOT_NULL_BYTE : NullHandling.IS_NULL_BYTE);
+    initValue(buf, position);
+  }
+
+  @Override
+  public void aggregate(ByteBuffer buf, int position, int startRow, int endRow)
+  {
+    if ((buf.get(position) & BYTE_FLAG_FOUND_MASK) != BYTE_FLAG_FOUND_MASK) {
+      boolean[] nulls = vectorValueSelector.getNullVector();
+      // check if there are any nulls
+      if (nulls != null && startRow <= nulls.length) {
+        for (int i = startRow; i < endRow; i++) {
+          // And there is actually a null
+          if (nulls[i]) {
+            putNull(buf, position);
+            return;
+          }
+        }
+      }
+      // There are no nulls, so put a value from the value selector
+      putValue(buf, position, startRow);
+      buf.put(position, (byte) (BYTE_FLAG_FOUND_MASK | NullHandling.IS_NOT_NULL_BYTE));
+    }
+  }
+
+  @Override
+  public void aggregate(
+      ByteBuffer buf,
+      int numRows,
+      int[] positions,
+      @Nullable int[] rows,
+      int positionOffset
+  )
+  {
+    for (int i = 0; i < numRows; i++) {
+      int position = positions[i] + positionOffset;
+      int row = rows == null ? i : rows[i];
+      aggregate(buf, position, row, row);
+    }
+  }
+
+  @Override
+  public void close()
+  {
+    // No resources to cleanup.
+  }
+
+  protected boolean isValueNull(ByteBuffer buf, int position)
+  {
+    return (buf.get(position) & BYTE_FLAG_NULL_MASK) == NullHandling.IS_NULL_BYTE;
+  }
+
+  private void putNull(ByteBuffer buf, int position)
+  {
+    if (!replaceWithDefault) {
+      buf.put(position, (byte) (BYTE_FLAG_FOUND_MASK | NullHandling.IS_NULL_BYTE));

Review comment:
       never mind. I see why it will work. NullHandling.IS_NULL_BYTE is `(byte) 1`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] abhishekagarwal87 commented on a change in pull request #10338: Vectorized ANY aggregators

Posted by GitBox <gi...@apache.org>.
abhishekagarwal87 commented on a change in pull request #10338:
URL: https://github.com/apache/druid/pull/10338#discussion_r483474959



##########
File path: processing/src/main/java/org/apache/druid/query/aggregation/any/NumericAnyVectorAggregator.java
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.any;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.query.aggregation.VectorAggregator;
+import org.apache.druid.segment.vector.VectorValueSelector;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+
+public abstract class NumericAnyVectorAggregator implements VectorAggregator
+{
+  // Rightmost bit for is null check (0 for is null and 1 for not null)
+  // Second rightmost bit for is found check (0 for not found and 1 for found)
+  @VisibleForTesting
+  static final byte BYTE_FLAG_FOUND_MASK = 0x02;
+  private static final byte BYTE_FLAG_NULL_MASK = 0x01;
+  protected static final int FOUND_VALUE_OFFSET = Byte.BYTES;
+
+  private final boolean replaceWithDefault = NullHandling.replaceWithDefault();
+  protected final VectorValueSelector vectorValueSelector;
+
+  public NumericAnyVectorAggregator(VectorValueSelector vectorValueSelector)
+  {
+    this.vectorValueSelector = vectorValueSelector;
+  }
+
+  /**
+   * Initialize the buffer value given the initial offset position within the byte buffer for initialization
+   */
+  abstract void initValue(ByteBuffer buf, int position);
+
+  /**
+   * Place the primitive value in the buffer given the initial offset position within the byte buffer
+   * at which the current aggregate value is stored.
+   */
+  abstract void putValue(ByteBuffer buf, int position, int row);
+
+  /**
+   * Place the primitive null value in the buffer, fiven the initial offset position within the byte buffer

Review comment:
       `non null`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] suneet-s commented on a change in pull request #10338: WIP vectorized ANY aggregators

Posted by GitBox <gi...@apache.org>.
suneet-s commented on a change in pull request #10338:
URL: https://github.com/apache/druid/pull/10338#discussion_r481310684



##########
File path: processing/src/main/java/org/apache/druid/query/aggregation/any/DoubleAnyAggregatorFactory.java
##########
@@ -112,6 +115,18 @@ public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
     }
   }
 
+  @Override
+  public VectorAggregator factorizeVector(VectorColumnSelectorFactory selectorFactory)
+  {
+    return new DoubleAnyVectorAggregator(selectorFactory.makeValueSelector(fieldName));
+  }
+
+  @Override
+  public boolean canVectorize(ColumnInspector columnInspector)
+  {
+    return true;

Review comment:
       Should this check the columnType before offering a vectorized implementation? Similar question for other factories




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] abhishekagarwal87 commented on pull request #10338: Vectorized ANY aggregators

Posted by GitBox <gi...@apache.org>.
abhishekagarwal87 commented on pull request #10338:
URL: https://github.com/apache/druid/pull/10338#issuecomment-691457926


   Looks good. Please fix the docs spell check. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] abhishekagarwal87 commented on a change in pull request #10338: Vectorized ANY aggregators

Posted by GitBox <gi...@apache.org>.
abhishekagarwal87 commented on a change in pull request #10338:
URL: https://github.com/apache/druid/pull/10338#discussion_r484251440



##########
File path: processing/src/main/java/org/apache/druid/query/aggregation/any/FloatAnyVectorAggregator.java
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.any;
+
+import org.apache.druid.segment.vector.VectorValueSelector;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Vectorized implementation of the {@link FloatAnyBufferAggregator}
+ */
+public class FloatAnyVectorAggregator extends NumericAnyVectorAggregator
+{
+  public FloatAnyVectorAggregator(VectorValueSelector vectorValueSelector)
+  {
+    super(vectorValueSelector);
+  }
+
+  @Override
+  void initValue(ByteBuffer buf, int position)
+  {
+    buf.putFloat(position, 0F);
+  }
+
+  @Override
+  boolean putAnyValueFromRow(ByteBuffer buf, int position, int startRow, int endRow)
+  {
+    float[] values = vectorValueSelector.getFloatVector();
+    boolean isRowsWithinIndex = startRow < endRow && startRow < values.length;

Review comment:
       by the way, are these checks necessary? The framework calling aggregators should be taking care of passing valid ranges. If not, its a bug and exception will be raised. I didn't notice these checks in other vector aggregation implementations. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org