You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/02/16 05:28:30 UTC

[GitHub] [arrow-datafusion] Ted-Jiang opened a new pull request #1841: Implement bitmap_distinct function using croaring-rs bitmap

Ted-Jiang opened a new pull request #1841:
URL: https://github.com/apache/arrow-datafusion/pull/1841


   # Which issue does this PR close?
   Closes #1823.
   
   
    # Rationale for this change
   test result:
   ```
   1million_1million.parquet
   
   +----------------------------+
   | COUNT(DISTINCT test.value) |
   +----------------------------+
   | 631504                     |
   +----------------------------+
   1 row in set. Query took 1.225 seconds.
   
   
   +---------------------------------+
   | BITMAPCOUNTDISTINCT(test.value) |
   +---------------------------------+
   | 631504                          |
   +---------------------------------+·
   1 row in set. Query took 0.175 seconds(roaring-rs).
   
   
   +---------------------------------+
   | BITMAPCOUNTDISTINCT(test.value) |
   +---------------------------------+
   | 631504                          |
   +---------------------------------+
   1 row in set. Query took 0.052 seconds (croaring-rs).
   
   
   +----------------------------+
   | APPROXDISTINCT(test.value) |
   +----------------------------+
   | 630261                     |
   +----------------------------+
   1 row in set. Query took 0.052 seconds.
   
   ```
   So, choose [croaring-rs](https://github.com/saulius/croaring-rs) bitmap for more efficient.
   
   ## What changes are included in this PR?
   Add new `aggregate_function`  called bitmap_distinct
   
   ## Are there any user-facing changes?
   Maybe we can add one optimize rule to like `approx_median()`, change `count(distinct x)` to `bitmap_distinct`  in future 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Ted-Jiang edited a comment on pull request #1841: Implement bitmap_distinct function using croaring-rs bitmap

Posted by GitBox <gi...@apache.org>.
Ted-Jiang edited a comment on pull request #1841:
URL: https://github.com/apache/arrow-datafusion/pull/1841#issuecomment-1042631149


   > * I wonder if/how this gets things closer to being able to do distinct on compressed data (in DF's case on dictionary encoded columns). The problem (as I understand it) is that there is no guarantee that Arrow dictionaries have the same encoded representation for a value across batches, or even in the same record batch (if I remember how dictionary concatenation currently works in Arrow).
   
   `There is no guarantee that Arrow dictionaries have the same encoded representation for a value across batches` : yes
   We plan to maintain a global dictionary to encode col(string) into 32-bit int to accelerate count distinct.
   
   > * Would this work on 64-bit columns if they could first be casted to 32-bit? That is, assuming the contents of the 64-bit column actually fit as 32-bit unsigned integers?
   
   IMO, it will lose front 32 bit info, the result will be incorrect.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Ted-Jiang commented on a change in pull request #1841: Implement bitmap_distinct function using croaring-rs bitmap

Posted by GitBox <gi...@apache.org>.
Ted-Jiang commented on a change in pull request #1841:
URL: https://github.com/apache/arrow-datafusion/pull/1841#discussion_r808702220



##########
File path: datafusion/src/physical_plan/expressions/bitmap_distinct.rs
##########
@@ -0,0 +1,233 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query execution
+
+use std::any::Any;
+use std::borrow::Borrow;
+
+use std::fmt::Debug;
+use std::ops::BitOrAssign;
+use std::sync::Arc;
+
+use arrow::array::{
+    Array, ArrayRef, BinaryArray, Int16Array, Int32Array, Int8Array, UInt16Array,
+    UInt32Array, UInt8Array,
+};
+use arrow::datatypes::{DataType, Field};
+use croaring::Bitmap;
+use log::info;
+
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::{Accumulator, AggregateExpr, PhysicalExpr};
+use crate::scalar::ScalarValue;
+
+use super::format_state_name;
+
+#[derive(Debug)]
+pub struct BitMapDistinct {
+    name: String,
+    input_data_type: DataType,
+    expr: Arc<dyn PhysicalExpr>,
+}
+
+impl BitMapDistinct {
+    /// Create a new bitmapDistinct aggregate function.
+    pub fn new(
+        expr: Arc<dyn PhysicalExpr>,
+        name: impl Into<String>,
+        input_data_type: DataType,
+    ) -> Self {
+        Self {
+            name: name.into(),
+            input_data_type,
+            expr,
+        }
+    }
+}
+
+impl AggregateExpr for BitMapDistinct {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    /// the field of the final result of this aggregation.
+    fn field(&self) -> Result<Field> {
+        Ok(Field::new(&self.name, DataType::UInt64, false))
+    }
+
+    fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
+        let accumulator: Box<dyn Accumulator> = match &self.input_data_type {
+            DataType::UInt8
+            | DataType::UInt16
+            | DataType::UInt32
+            | DataType::Int8
+            | DataType::Int16
+            | DataType::Int32 => Box::new(BitmapDistinctCountAccumulator::try_new()),
+            other => {
+                return Err(DataFusionError::NotImplemented(format!(
+                    "Support for 'bitmap_distinct' for data type {} is not implemented",
+                    other
+                )))
+            }
+        };
+        Ok(accumulator)
+    }
+
+    fn state_fields(&self) -> Result<Vec<Field>> {
+        Ok(vec![Field::new(
+            &format_state_name(&self.name, "bitmap_registers"),
+            DataType::Binary,
+            false,
+        )])
+    }
+
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
+    }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
+}
+
+#[derive(Debug)]
+struct BitmapDistinctCountAccumulator {
+    bitmap: croaring::bitmap::Bitmap,
+}
+
+impl BitmapDistinctCountAccumulator {
+    fn try_new() -> Self {
+        Self {
+            bitmap: croaring::bitmap::Bitmap::create(),
+        }
+    }
+}
+
+impl Accumulator for BitmapDistinctCountAccumulator {
+    //state() can be used by physical nodes to aggregate states together and send them over the network/threads, to combine values.
+    fn state(&self) -> Result<Vec<ScalarValue>> {
+        //maybe run optimized

Review comment:
       Should we check the map size then decide wether run optimized




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Jimexist commented on a change in pull request #1841: Implement bitmap_distinct function using roaring bitmap

Posted by GitBox <gi...@apache.org>.
Jimexist commented on a change in pull request #1841:
URL: https://github.com/apache/arrow-datafusion/pull/1841#discussion_r815443929



##########
File path: datafusion/src/physical_plan/expressions/bitmap_distinct.rs
##########
@@ -0,0 +1,219 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query execution
+
+use std::any::Any;
+
+use std::fmt::Debug;
+use std::ops::BitOrAssign;
+use std::sync::Arc;
+
+use arrow::array::{
+    Array, ArrayRef, BinaryArray, Int16Array, Int32Array, Int8Array, UInt16Array,
+    UInt32Array, UInt8Array,
+};
+use arrow::datatypes::{DataType, Field};
+use log::error;
+use roaring::RoaringBitmap;
+
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::{Accumulator, AggregateExpr, PhysicalExpr};
+use crate::scalar::ScalarValue;
+
+use super::format_state_name;
+
+/// APPROX_DISTINCT aggregate expression

Review comment:
       ```suggestion
   /// BITMAP_DISTINCT aggregate expression
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Ted-Jiang commented on pull request #1841: Implement bitmap_distinct function using roaring bitmap

Posted by GitBox <gi...@apache.org>.
Ted-Jiang commented on pull request #1841:
URL: https://github.com/apache/arrow-datafusion/pull/1841#issuecomment-1053913138


   @Jimexist @houqp Have fixed the conflict. If there are no more issues here, could you please merge this to avoid any conflict.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Ted-Jiang commented on a change in pull request #1841: Implement bitmap_distinct function using croaring-rs bitmap

Posted by GitBox <gi...@apache.org>.
Ted-Jiang commented on a change in pull request #1841:
URL: https://github.com/apache/arrow-datafusion/pull/1841#discussion_r808705218



##########
File path: datafusion/src/physical_plan/expressions/bitmap_distinct.rs
##########
@@ -0,0 +1,233 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query execution
+
+use std::any::Any;
+use std::borrow::Borrow;
+
+use std::fmt::Debug;
+use std::ops::BitOrAssign;
+use std::sync::Arc;
+
+use arrow::array::{
+    Array, ArrayRef, BinaryArray, Int16Array, Int32Array, Int8Array, UInt16Array,
+    UInt32Array, UInt8Array,
+};
+use arrow::datatypes::{DataType, Field};
+use croaring::Bitmap;
+use log::info;
+
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::{Accumulator, AggregateExpr, PhysicalExpr};
+use crate::scalar::ScalarValue;
+
+use super::format_state_name;
+
+#[derive(Debug)]
+pub struct BitMapDistinct {
+    name: String,
+    input_data_type: DataType,
+    expr: Arc<dyn PhysicalExpr>,
+}
+
+impl BitMapDistinct {
+    /// Create a new bitmapDistinct aggregate function.
+    pub fn new(
+        expr: Arc<dyn PhysicalExpr>,
+        name: impl Into<String>,
+        input_data_type: DataType,
+    ) -> Self {
+        Self {
+            name: name.into(),
+            input_data_type,
+            expr,
+        }
+    }
+}
+
+impl AggregateExpr for BitMapDistinct {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    /// the field of the final result of this aggregation.
+    fn field(&self) -> Result<Field> {
+        Ok(Field::new(&self.name, DataType::UInt64, false))
+    }
+
+    fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
+        let accumulator: Box<dyn Accumulator> = match &self.input_data_type {
+            DataType::UInt8
+            | DataType::UInt16
+            | DataType::UInt32
+            | DataType::Int8
+            | DataType::Int16
+            | DataType::Int32 => Box::new(BitmapDistinctCountAccumulator::try_new()),
+            other => {
+                return Err(DataFusionError::NotImplemented(format!(
+                    "Support for 'bitmap_distinct' for data type {} is not implemented",
+                    other
+                )))
+            }
+        };
+        Ok(accumulator)
+    }
+
+    fn state_fields(&self) -> Result<Vec<Field>> {
+        Ok(vec![Field::new(
+            &format_state_name(&self.name, "bitmap_registers"),
+            DataType::Binary,
+            false,
+        )])
+    }
+
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
+    }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
+}
+
+#[derive(Debug)]
+struct BitmapDistinctCountAccumulator {
+    bitmap: croaring::bitmap::Bitmap,
+}
+
+impl BitmapDistinctCountAccumulator {
+    fn try_new() -> Self {
+        Self {
+            bitmap: croaring::bitmap::Bitmap::create(),
+        }
+    }
+}
+
+impl Accumulator for BitmapDistinctCountAccumulator {
+    //state() can be used by physical nodes to aggregate states together and send them over the network/threads, to combine values.
+    fn state(&self) -> Result<Vec<ScalarValue>> {
+        //maybe run optimized
+        let buffer = self.bitmap.serialize();
+        Ok(vec![ScalarValue::Binary(Some(buffer))])
+    }
+
+    fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
+        let value = &values[0];
+        if value.is_empty() {
+            info!("BitmapDistinctCountAccumulator update_batch in empty batch");
+            return Ok(());
+        }
+        match value.data_type() {
+            DataType::Int8 => {
+                let array = value.as_any().downcast_ref::<Int8Array>().unwrap();
+                self.bitmap.add_many(
+                    &array
+                        .values()
+                        .iter()
+                        .map(|&x| x as u32)
+                        .collect::<Vec<u32>>(),
+                );
+            }
+            DataType::Int16 => {
+                let array = value.as_any().downcast_ref::<Int16Array>().unwrap();
+                self.bitmap.add_many(
+                    &array
+                        .values()
+                        .iter()
+                        .map(|&x| x as u32)
+                        .collect::<Vec<u32>>(),
+                );
+            }
+            DataType::Int32 => {
+                let array = value.as_any().downcast_ref::<Int32Array>().unwrap();
+                self.bitmap.add_many(
+                    &array
+                        .values()
+                        .iter()
+                        .map(|&x| x as u32)
+                        .collect::<Vec<u32>>(),
+                );
+            }
+            DataType::UInt8 => {
+                let array = value.as_any().downcast_ref::<UInt8Array>().unwrap();
+                self.bitmap.add_many(
+                    &array
+                        .values()
+                        .iter()
+                        .map(|&x| x as u32)
+                        .collect::<Vec<u32>>(),
+                );
+            }
+            DataType::UInt16 => {
+                let array = value.as_any().downcast_ref::<UInt16Array>().unwrap();
+                self.bitmap.add_many(
+                    &array
+                        .values()
+                        .iter()
+                        .map(|&x| x as u32)
+                        .collect::<Vec<u32>>(),
+                );
+            }
+            DataType::UInt32 => {
+                let array = value.as_any().downcast_ref::<UInt32Array>().unwrap();
+                self.bitmap.add_many(array.values());
+            }
+            e => {
+                return Err(DataFusionError::Internal(format!(
+                    "BITMAP_COUNT_DISTINCT is not expected to receive the type {:?}",
+                    e
+                )));
+            }
+        }
+        Ok(())
+    }
+
+    fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
+        assert_eq!(1, states.len(), "expect only 1 element in the states");

Review comment:
       in bitmap_distinct `merge_batch` will get input from `state`, `state` will only return one single array of bitmap, i add it to emphasize it.  
   Yes, i should log a error rather than panic👍 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] liukun4515 commented on a change in pull request #1841: Implement bitmap_distinct function using roaring bitmap

Posted by GitBox <gi...@apache.org>.
liukun4515 commented on a change in pull request #1841:
URL: https://github.com/apache/arrow-datafusion/pull/1841#discussion_r810873400



##########
File path: datafusion/src/physical_plan/expressions/bitmap_distinct.rs
##########
@@ -0,0 +1,219 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query execution
+
+use std::any::Any;
+
+use std::fmt::Debug;
+use std::ops::BitOrAssign;
+use std::sync::Arc;
+
+use arrow::array::{
+    Array, ArrayRef, BinaryArray, Int16Array, Int32Array, Int8Array, UInt16Array,
+    UInt32Array, UInt8Array,
+};
+use arrow::datatypes::{DataType, Field};
+use log::error;
+use roaring::RoaringBitmap;
+
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::{Accumulator, AggregateExpr, PhysicalExpr};
+use crate::scalar::ScalarValue;
+
+use super::format_state_name;
+
+/// APPROX_DISTINCT aggregate expression
+#[derive(Debug)]
+pub struct BitMapDistinct {
+    name: String,
+    input_data_type: DataType,
+    expr: Arc<dyn PhysicalExpr>,
+}
+
+impl BitMapDistinct {
+    /// Create a new ApproxDistinct aggregate function.
+    pub fn new(
+        expr: Arc<dyn PhysicalExpr>,
+        name: impl Into<String>,
+        input_data_type: DataType,
+    ) -> Self {
+        Self {
+            name: name.into(),
+            input_data_type,
+            expr,
+        }
+    }
+}
+
+impl AggregateExpr for BitMapDistinct {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    /// the field of the final result of this aggregation.
+    fn field(&self) -> Result<Field> {
+        Ok(Field::new(&self.name, DataType::UInt64, false))
+    }
+
+    fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
+        let accumulator: Box<dyn Accumulator> = match &self.input_data_type {
+            DataType::UInt8
+            | DataType::UInt16
+            | DataType::UInt32
+            | DataType::Int8
+            | DataType::Int16
+            | DataType::Int32 => Box::new(BitmapDistinctCountAccumulator::try_new()),
+            other => {
+                return Err(DataFusionError::NotImplemented(format!(
+                    "Support for 'bitmap_distinct' for data type {} is not implemented",
+                    other
+                )))
+            }
+        };
+        Ok(accumulator)
+    }
+
+    fn state_fields(&self) -> Result<Vec<Field>> {
+        Ok(vec![Field::new(
+            &format_state_name(&self.name, "bitmap_registers"),
+            DataType::Binary,
+            false,
+        )])
+    }
+
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
+    }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
+}
+
+#[derive(Debug)]
+struct BitmapDistinctCountAccumulator {
+    bitmap: roaring::bitmap::RoaringBitmap,
+}
+
+impl BitmapDistinctCountAccumulator {
+    fn try_new() -> Self {
+        Self {
+            bitmap: RoaringBitmap::new(),
+        }
+    }
+}
+
+impl Accumulator for BitmapDistinctCountAccumulator {
+    //state() can be used by physical nodes to aggregate states together and send them over the network/threads, to combine values.
+    fn state(&self) -> Result<Vec<ScalarValue>> {
+        let mut bytes = vec![];
+        self.bitmap.serialize_into(&mut bytes).unwrap();
+        Ok(vec![ScalarValue::Binary(Some(bytes))])
+    }
+
+    fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
+        let value = &values[0];
+        if value.is_empty() {
+            return Ok(());
+        }
+        match value.data_type() {
+            DataType::Int8 => {
+                let array = value.as_any().downcast_ref::<Int8Array>().unwrap();
+                for i in 0..array.len() {
+                    self.bitmap.insert(array.value(i) as u32);
+                }
+            }
+            DataType::Int16 => {
+                let array = value.as_any().downcast_ref::<Int16Array>().unwrap();
+                for i in 0..array.len() {
+                    self.bitmap.insert(array.value(i) as u32);
+                }
+            }
+            DataType::Int32 => {
+                let array = value.as_any().downcast_ref::<Int32Array>().unwrap();
+                for i in 0..array.len() {
+                    self.bitmap.insert(array.value(i) as u32);
+                }
+            }
+            DataType::UInt8 => {
+                let array = value.as_any().downcast_ref::<UInt8Array>().unwrap();
+                for i in 0..array.len() {
+                    self.bitmap.insert(array.value(i) as u32);
+                }
+            }
+            DataType::UInt16 => {
+                let array = value.as_any().downcast_ref::<UInt16Array>().unwrap();
+                for i in 0..array.len() {
+                    self.bitmap.insert(array.value(i) as u32);
+                }
+            }
+            DataType::UInt32 => {
+                let array = value.as_any().downcast_ref::<UInt32Array>().unwrap();
+                for i in 0..array.len() {
+                    self.bitmap.insert(array.value(i));
+                }
+            }
+            e => {
+                return Err(DataFusionError::Internal(format!(
+                    "BITMAP_COUNT_DISTINCT is not expected to receive the type {:?}",
+                    e
+                )));
+            }
+        }
+        Ok(())
+    }
+
+    fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
+        if states.len() != 1 {
+            error!(
+                "expect only 1 element in the states but found {:?}",
+                &states.len()
+            )
+        }
+
+        let binary_array = states[0].as_any().downcast_ref::<BinaryArray>().unwrap();
+
+        for b in binary_array.iter() {
+            let v = b.ok_or_else(|| {
+                DataFusionError::Internal(
+                    "Impossibly got empty binary array from states".into(),
+                )
+            })?;
+            let bitmap = RoaringBitmap::deserialize_from(&v.to_vec()[..]).unwrap();
+            self.bitmap.bitor_assign(bitmap);
+        }
+        Ok(())
+    }
+
+    fn evaluate(&self) -> Result<ScalarValue> {
+        Ok(ScalarValue::from(self.bitmap.len()))
+    }
+}
+
+pub(crate) fn is_bitmap_count_distinct_supported_arg_type(arg_type: &DataType) -> bool {

Review comment:
       Do we need to support U64, I64 or other numeric type?
   @Ted-Jiang 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on a change in pull request #1841: Implement bitmap_distinct function using roaring bitmap

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #1841:
URL: https://github.com/apache/arrow-datafusion/pull/1841#discussion_r816834403



##########
File path: datafusion-physical-expr/Cargo.toml
##########
@@ -41,3 +41,4 @@ arrow = { version = "9.0.0", features = ["prettyprint"] }
 paste = "^1.0"
 ahash = { version = "0.7", default-features = false }
 ordered-float = "2.10"
+roaring = "0.8.1"

Review comment:
       What would you think about making this an optional dependency (much like crypto expressions, etc)  as defined on master?
   
   This would let anyone who wants this feature be able to use it, but would not require it for anyone who did not?
   
   https://github.com/apache/arrow-datafusion/blob/7eb3bd8/datafusion-physical-expr/Cargo.toml#L35-L39

##########
File path: datafusion-physical-expr/src/expressions/bitmap_distinct.rs
##########
@@ -0,0 +1,211 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query execution
+
+use std::any::Any;
+
+use std::fmt::Debug;
+use std::ops::BitOrAssign;
+use std::sync::Arc;
+
+use arrow::array::{
+    Array, ArrayRef, BinaryArray, Int16Array, Int32Array, Int8Array, UInt16Array,
+    UInt32Array, UInt8Array,
+};
+use arrow::datatypes::{DataType, Field};
+use datafusion_common::{DataFusionError, Result, ScalarValue};
+use datafusion_expr::Accumulator;
+use roaring::RoaringBitmap;
+
+use crate::{AggregateExpr, PhysicalExpr};
+
+use super::format_state_name;
+
+/// BITMAP_DISTINCT aggregate expression
+#[derive(Debug)]
+pub struct BitMapDistinct {
+    name: String,
+    input_data_type: DataType,
+    expr: Arc<dyn PhysicalExpr>,
+}
+
+impl BitMapDistinct {
+    /// Create a new BitmapDistinct aggregate function.
+    pub fn new(
+        expr: Arc<dyn PhysicalExpr>,
+        name: impl Into<String>,
+        input_data_type: DataType,
+    ) -> Self {
+        Self {
+            name: name.into(),
+            input_data_type,
+            expr,
+        }
+    }
+}
+
+impl AggregateExpr for BitMapDistinct {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    /// the field of the final result of this aggregation.
+    fn field(&self) -> Result<Field> {
+        Ok(Field::new(&self.name, DataType::UInt64, false))
+    }
+
+    fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
+        let accumulator: Box<dyn Accumulator> = match &self.input_data_type {
+            DataType::UInt8
+            | DataType::UInt16
+            | DataType::UInt32
+            | DataType::Int8
+            | DataType::Int16
+            | DataType::Int32 => Box::new(BitmapDistinctCountAccumulator::try_new()),
+            other => {
+                return Err(DataFusionError::NotImplemented(format!(
+                    "Support for 'bitmap_distinct' for data type {} is not implemented",
+                    other
+                )))
+            }
+        };
+        Ok(accumulator)
+    }
+
+    fn state_fields(&self) -> Result<Vec<Field>> {
+        Ok(vec![Field::new(
+            &format_state_name(&self.name, "bitmap_registers"),
+            DataType::Binary,
+            false,
+        )])
+    }
+
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
+    }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
+}
+
+#[derive(Debug)]
+struct BitmapDistinctCountAccumulator {
+    bitmap: roaring::bitmap::RoaringBitmap,
+}
+
+impl BitmapDistinctCountAccumulator {
+    fn try_new() -> Self {
+        Self {
+            bitmap: RoaringBitmap::new(),
+        }
+    }
+}
+
+impl Accumulator for BitmapDistinctCountAccumulator {
+    //state() can be used by physical nodes to aggregate states together and send them over the network/threads, to combine values.
+    fn state(&self) -> Result<Vec<ScalarValue>> {
+        let mut bytes = vec![];
+        self.bitmap.serialize_into(&mut bytes).unwrap();
+        Ok(vec![ScalarValue::Binary(Some(bytes))])
+    }
+
+    fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
+        let value = &values[0];
+        if value.is_empty() {
+            return Ok(());
+        }
+        match value.data_type() {
+            DataType::Int8 => {
+                let array = value.as_any().downcast_ref::<Int8Array>().unwrap();
+                for i in 0..array.len() {

Review comment:
       This code doesn't seem to handle nulls (as in don't you have to check `array.is_valid(i)` prior to getting `array.value()`?
   
   A test case for null values would probably be useful 
   
   Maybe you could use an iterator like
   
   ```rust
   for value in array.iter() {
     match value {
       Some(v) => self.bitmap.insert(value as u32);
       None => // do something with NULLs here
     }
   }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Ted-Jiang commented on a change in pull request #1841: Implement bitmap_distinct function using roaring bitmap

Posted by GitBox <gi...@apache.org>.
Ted-Jiang commented on a change in pull request #1841:
URL: https://github.com/apache/arrow-datafusion/pull/1841#discussion_r815543290



##########
File path: datafusion/src/physical_plan/expressions/bitmap_distinct.rs
##########
@@ -0,0 +1,219 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query execution
+
+use std::any::Any;
+
+use std::fmt::Debug;
+use std::ops::BitOrAssign;
+use std::sync::Arc;
+
+use arrow::array::{
+    Array, ArrayRef, BinaryArray, Int16Array, Int32Array, Int8Array, UInt16Array,
+    UInt32Array, UInt8Array,
+};
+use arrow::datatypes::{DataType, Field};
+use log::error;
+use roaring::RoaringBitmap;
+
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::{Accumulator, AggregateExpr, PhysicalExpr};
+use crate::scalar::ScalarValue;
+
+use super::format_state_name;
+
+/// APPROX_DISTINCT aggregate expression

Review comment:
       Thanks!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Ted-Jiang commented on a change in pull request #1841: Implement bitmap_distinct function using croaring-rs bitmap

Posted by GitBox <gi...@apache.org>.
Ted-Jiang commented on a change in pull request #1841:
URL: https://github.com/apache/arrow-datafusion/pull/1841#discussion_r809632924



##########
File path: datafusion/Cargo.toml
##########
@@ -81,6 +81,7 @@ num-traits = { version = "0.2", optional = true }
 pyo3 = { version = "0.15", optional = true }
 tempfile = "3"
 parking_lot = "0.12"
+croaring = "0.5.1"

Review comment:
       @alamb Thanks for your reply. I agree It 's complicated for add a new dependency `clang` into datafusion.
   IMO, we still need an high-efficiency bitmap implement `roarling-bitmap`, like IOx and other high-end OLAP systems to do some optimize like late materialization. Maybe we should change `croaring ` to `roarling-rs` (completely written in rust, won't cost too mush compile time, i also find datafusion compile\build parallel ), maybe one day it will catch up with `croaring ` 😂




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on a change in pull request #1841: Implement bitmap_distinct function using croaring-rs bitmap

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #1841:
URL: https://github.com/apache/arrow-datafusion/pull/1841#discussion_r809028680



##########
File path: .github/workflows/rust.yml
##########
@@ -58,6 +58,11 @@ jobs:
           rustup toolchain install ${{ matrix.rust }}
           rustup default ${{ matrix.rust }}
           rustup component add rustfmt
+      - name: Set up Clang

Review comment:
       I think in general apache projects try not to use "third-party" github actions (aka actions not hosted by github or apache itself) so I am worried about using this action




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] e-dard commented on a change in pull request #1841: Implement bitmap_distinct function using croaring-rs bitmap

Posted by GitBox <gi...@apache.org>.
e-dard commented on a change in pull request #1841:
URL: https://github.com/apache/arrow-datafusion/pull/1841#discussion_r808759307



##########
File path: datafusion/src/physical_plan/expressions/bitmap_distinct.rs
##########
@@ -0,0 +1,233 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query execution
+
+use std::any::Any;
+use std::borrow::Borrow;
+
+use std::fmt::Debug;
+use std::ops::BitOrAssign;
+use std::sync::Arc;
+
+use arrow::array::{
+    Array, ArrayRef, BinaryArray, Int16Array, Int32Array, Int8Array, UInt16Array,
+    UInt32Array, UInt8Array,
+};
+use arrow::datatypes::{DataType, Field};
+use croaring::Bitmap;
+use log::info;
+
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::{Accumulator, AggregateExpr, PhysicalExpr};
+use crate::scalar::ScalarValue;
+
+use super::format_state_name;
+
+#[derive(Debug)]
+pub struct BitMapDistinct {
+    name: String,
+    input_data_type: DataType,
+    expr: Arc<dyn PhysicalExpr>,
+}
+
+impl BitMapDistinct {
+    /// Create a new bitmapDistinct aggregate function.
+    pub fn new(
+        expr: Arc<dyn PhysicalExpr>,
+        name: impl Into<String>,
+        input_data_type: DataType,
+    ) -> Self {
+        Self {
+            name: name.into(),
+            input_data_type,
+            expr,
+        }
+    }
+}
+
+impl AggregateExpr for BitMapDistinct {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    /// the field of the final result of this aggregation.
+    fn field(&self) -> Result<Field> {
+        Ok(Field::new(&self.name, DataType::UInt64, false))
+    }
+
+    fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
+        let accumulator: Box<dyn Accumulator> = match &self.input_data_type {
+            DataType::UInt8
+            | DataType::UInt16
+            | DataType::UInt32
+            | DataType::Int8
+            | DataType::Int16
+            | DataType::Int32 => Box::new(BitmapDistinctCountAccumulator::try_new()),
+            other => {
+                return Err(DataFusionError::NotImplemented(format!(
+                    "Support for 'bitmap_distinct' for data type {} is not implemented",
+                    other
+                )))
+            }
+        };
+        Ok(accumulator)
+    }
+
+    fn state_fields(&self) -> Result<Vec<Field>> {
+        Ok(vec![Field::new(
+            &format_state_name(&self.name, "bitmap_registers"),
+            DataType::Binary,
+            false,
+        )])
+    }
+
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
+    }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
+}
+
+#[derive(Debug)]
+struct BitmapDistinctCountAccumulator {
+    bitmap: croaring::bitmap::Bitmap,
+}
+
+impl BitmapDistinctCountAccumulator {
+    fn try_new() -> Self {
+        Self {
+            bitmap: croaring::bitmap::Bitmap::create(),
+        }
+    }
+}
+
+impl Accumulator for BitmapDistinctCountAccumulator {
+    //state() can be used by physical nodes to aggregate states together and send them over the network/threads, to combine values.
+    fn state(&self) -> Result<Vec<ScalarValue>> {
+        //maybe run optimized
+        let buffer = self.bitmap.serialize();
+        Ok(vec![ScalarValue::Binary(Some(buffer))])
+    }
+
+    fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
+        let value = &values[0];
+        if value.is_empty() {
+            info!("BitmapDistinctCountAccumulator update_batch in empty batch");
+            return Ok(());
+        }
+        match value.data_type() {
+            DataType::Int8 => {
+                let array = value.as_any().downcast_ref::<Int8Array>().unwrap();
+                self.bitmap.add_many(
+                    &array
+                        .values()
+                        .iter()
+                        .map(|&x| x as u32)
+                        .collect::<Vec<u32>>(),
+                );
+            }
+            DataType::Int16 => {
+                let array = value.as_any().downcast_ref::<Int16Array>().unwrap();
+                self.bitmap.add_many(
+                    &array
+                        .values()
+                        .iter()
+                        .map(|&x| x as u32)
+                        .collect::<Vec<u32>>(),
+                );
+            }
+            DataType::Int32 => {
+                let array = value.as_any().downcast_ref::<Int32Array>().unwrap();
+                self.bitmap.add_many(
+                    &array
+                        .values()
+                        .iter()
+                        .map(|&x| x as u32)
+                        .collect::<Vec<u32>>(),
+                );
+            }
+            DataType::UInt8 => {
+                let array = value.as_any().downcast_ref::<UInt8Array>().unwrap();
+                self.bitmap.add_many(
+                    &array
+                        .values()
+                        .iter()
+                        .map(|&x| x as u32)
+                        .collect::<Vec<u32>>(),
+                );
+            }
+            DataType::UInt16 => {
+                let array = value.as_any().downcast_ref::<UInt16Array>().unwrap();
+                self.bitmap.add_many(
+                    &array
+                        .values()
+                        .iter()
+                        .map(|&x| x as u32)
+                        .collect::<Vec<u32>>(),
+                );
+            }
+            DataType::UInt32 => {
+                let array = value.as_any().downcast_ref::<UInt32Array>().unwrap();
+                self.bitmap.add_many(array.values());
+            }
+            e => {
+                return Err(DataFusionError::Internal(format!(
+                    "BITMAP_COUNT_DISTINCT is not expected to receive the type {:?}",
+                    e
+                )));
+            }
+        }
+        Ok(())
+    }
+
+    fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
+        assert_eq!(1, states.len(), "expect only 1 element in the states");
+        let binary_array = states[0].as_any().downcast_ref::<BinaryArray>().unwrap();
+        let bitmaps = binary_array
+            .iter()
+            .map(|x| {
+                Bitmap::deserialize(x.expect(
+                    "Impossibly got empty binary array from states in bitmap_distinct",
+                ))
+            })
+            .collect::<Vec<Bitmap>>();
+        let bitmaps = &bitmaps.iter().map(|x| x.borrow()).collect::<Vec<&Bitmap>>()[..];
+        //Do not use self.bitmap = xxx, because '=' has been wrote for 'BitAnd' !.
+        self.bitmap.bitor_assign(Bitmap::fast_or(bitmaps));
+        Ok(())
+    }
+
+    fn evaluate(&self) -> Result<ScalarValue> {

Review comment:
       No this is the only way. I was just checking it is only called once for the final result. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Ted-Jiang commented on pull request #1841: Implement bitmap_distinct function using roaring bitmap

Posted by GitBox <gi...@apache.org>.
Ted-Jiang commented on pull request #1841:
URL: https://github.com/apache/arrow-datafusion/pull/1841#issuecomment-1057707894


   > @Ted-Jiang
   > 
   > FYI https://github.com/RoaringBitmap/roaring-rs/releases/tag/v0.9.0 was just released (with big performance optimizations)
   
   @Dandandan Thanks for your info, maybe i will redo the benchmark.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Ted-Jiang commented on a change in pull request #1841: Implement bitmap_distinct function using croaring-rs bitmap

Posted by GitBox <gi...@apache.org>.
Ted-Jiang commented on a change in pull request #1841:
URL: https://github.com/apache/arrow-datafusion/pull/1841#discussion_r808719933



##########
File path: datafusion/src/physical_plan/expressions/bitmap_distinct.rs
##########
@@ -0,0 +1,233 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query execution
+
+use std::any::Any;
+use std::borrow::Borrow;
+
+use std::fmt::Debug;
+use std::ops::BitOrAssign;
+use std::sync::Arc;
+
+use arrow::array::{
+    Array, ArrayRef, BinaryArray, Int16Array, Int32Array, Int8Array, UInt16Array,
+    UInt32Array, UInt8Array,
+};
+use arrow::datatypes::{DataType, Field};
+use croaring::Bitmap;
+use log::info;
+
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::{Accumulator, AggregateExpr, PhysicalExpr};
+use crate::scalar::ScalarValue;
+
+use super::format_state_name;
+
+#[derive(Debug)]
+pub struct BitMapDistinct {
+    name: String,
+    input_data_type: DataType,
+    expr: Arc<dyn PhysicalExpr>,
+}
+
+impl BitMapDistinct {
+    /// Create a new bitmapDistinct aggregate function.
+    pub fn new(
+        expr: Arc<dyn PhysicalExpr>,
+        name: impl Into<String>,
+        input_data_type: DataType,
+    ) -> Self {
+        Self {
+            name: name.into(),
+            input_data_type,
+            expr,
+        }
+    }
+}
+
+impl AggregateExpr for BitMapDistinct {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    /// the field of the final result of this aggregation.
+    fn field(&self) -> Result<Field> {
+        Ok(Field::new(&self.name, DataType::UInt64, false))
+    }
+
+    fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
+        let accumulator: Box<dyn Accumulator> = match &self.input_data_type {
+            DataType::UInt8
+            | DataType::UInt16
+            | DataType::UInt32
+            | DataType::Int8
+            | DataType::Int16
+            | DataType::Int32 => Box::new(BitmapDistinctCountAccumulator::try_new()),
+            other => {
+                return Err(DataFusionError::NotImplemented(format!(
+                    "Support for 'bitmap_distinct' for data type {} is not implemented",
+                    other
+                )))
+            }
+        };
+        Ok(accumulator)
+    }
+
+    fn state_fields(&self) -> Result<Vec<Field>> {
+        Ok(vec![Field::new(
+            &format_state_name(&self.name, "bitmap_registers"),
+            DataType::Binary,
+            false,
+        )])
+    }
+
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
+    }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
+}
+
+#[derive(Debug)]
+struct BitmapDistinctCountAccumulator {
+    bitmap: croaring::bitmap::Bitmap,
+}
+
+impl BitmapDistinctCountAccumulator {
+    fn try_new() -> Self {
+        Self {
+            bitmap: croaring::bitmap::Bitmap::create(),
+        }
+    }
+}
+
+impl Accumulator for BitmapDistinctCountAccumulator {
+    //state() can be used by physical nodes to aggregate states together and send them over the network/threads, to combine values.
+    fn state(&self) -> Result<Vec<ScalarValue>> {
+        //maybe run optimized
+        let buffer = self.bitmap.serialize();
+        Ok(vec![ScalarValue::Binary(Some(buffer))])
+    }
+
+    fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
+        let value = &values[0];
+        if value.is_empty() {
+            info!("BitmapDistinctCountAccumulator update_batch in empty batch");
+            return Ok(());
+        }
+        match value.data_type() {
+            DataType::Int8 => {
+                let array = value.as_any().downcast_ref::<Int8Array>().unwrap();
+                self.bitmap.add_many(
+                    &array
+                        .values()
+                        .iter()
+                        .map(|&x| x as u32)
+                        .collect::<Vec<u32>>(),
+                );
+            }
+            DataType::Int16 => {
+                let array = value.as_any().downcast_ref::<Int16Array>().unwrap();
+                self.bitmap.add_many(
+                    &array
+                        .values()
+                        .iter()
+                        .map(|&x| x as u32)
+                        .collect::<Vec<u32>>(),
+                );
+            }
+            DataType::Int32 => {
+                let array = value.as_any().downcast_ref::<Int32Array>().unwrap();
+                self.bitmap.add_many(
+                    &array
+                        .values()
+                        .iter()
+                        .map(|&x| x as u32)
+                        .collect::<Vec<u32>>(),
+                );
+            }
+            DataType::UInt8 => {
+                let array = value.as_any().downcast_ref::<UInt8Array>().unwrap();
+                self.bitmap.add_many(
+                    &array
+                        .values()
+                        .iter()
+                        .map(|&x| x as u32)
+                        .collect::<Vec<u32>>(),
+                );
+            }
+            DataType::UInt16 => {
+                let array = value.as_any().downcast_ref::<UInt16Array>().unwrap();
+                self.bitmap.add_many(
+                    &array
+                        .values()
+                        .iter()
+                        .map(|&x| x as u32)
+                        .collect::<Vec<u32>>(),
+                );
+            }
+            DataType::UInt32 => {
+                let array = value.as_any().downcast_ref::<UInt32Array>().unwrap();
+                self.bitmap.add_many(array.values());
+            }
+            e => {
+                return Err(DataFusionError::Internal(format!(
+                    "BITMAP_COUNT_DISTINCT is not expected to receive the type {:?}",
+                    e
+                )));
+            }
+        }
+        Ok(())
+    }
+
+    fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
+        assert_eq!(1, states.len(), "expect only 1 element in the states");
+        let binary_array = states[0].as_any().downcast_ref::<BinaryArray>().unwrap();
+        let bitmaps = binary_array
+            .iter()
+            .map(|x| {
+                Bitmap::deserialize(x.expect(
+                    "Impossibly got empty binary array from states in bitmap_distinct",
+                ))
+            })
+            .collect::<Vec<Bitmap>>();
+        let bitmaps = &bitmaps.iter().map(|x| x.borrow()).collect::<Vec<&Bitmap>>()[..];
+        //Do not use self.bitmap = xxx, because '=' has been wrote for 'BitAnd' !.
+        self.bitmap.bitor_assign(Bitmap::fast_or(bitmaps));
+        Ok(())
+    }
+
+    fn evaluate(&self) -> Result<ScalarValue> {

Review comment:
       Sorry , is there something i miss ? Are there more efficient API, to get the number of integers contained in the bitmap 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Dandandan commented on pull request #1841: Implement bitmap_distinct function using roaring bitmap

Posted by GitBox <gi...@apache.org>.
Dandandan commented on pull request #1841:
URL: https://github.com/apache/arrow-datafusion/pull/1841#issuecomment-1056837058


   @Ted-Jiang 
   
   FYI https://github.com/RoaringBitmap/roaring-rs/releases/tag/v0.9.0 was just released (with big performance optimizations)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Ted-Jiang commented on a change in pull request #1841: Implement bitmap_distinct function using croaring-rs bitmap

Posted by GitBox <gi...@apache.org>.
Ted-Jiang commented on a change in pull request #1841:
URL: https://github.com/apache/arrow-datafusion/pull/1841#discussion_r808719208



##########
File path: datafusion/src/physical_plan/expressions/bitmap_distinct.rs
##########
@@ -0,0 +1,233 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query execution
+
+use std::any::Any;
+use std::borrow::Borrow;
+
+use std::fmt::Debug;
+use std::ops::BitOrAssign;
+use std::sync::Arc;
+
+use arrow::array::{
+    Array, ArrayRef, BinaryArray, Int16Array, Int32Array, Int8Array, UInt16Array,
+    UInt32Array, UInt8Array,
+};
+use arrow::datatypes::{DataType, Field};
+use croaring::Bitmap;
+use log::info;
+
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::{Accumulator, AggregateExpr, PhysicalExpr};
+use crate::scalar::ScalarValue;
+
+use super::format_state_name;
+
+#[derive(Debug)]
+pub struct BitMapDistinct {
+    name: String,
+    input_data_type: DataType,
+    expr: Arc<dyn PhysicalExpr>,
+}
+
+impl BitMapDistinct {
+    /// Create a new bitmapDistinct aggregate function.
+    pub fn new(
+        expr: Arc<dyn PhysicalExpr>,
+        name: impl Into<String>,
+        input_data_type: DataType,
+    ) -> Self {
+        Self {
+            name: name.into(),
+            input_data_type,
+            expr,
+        }
+    }
+}
+
+impl AggregateExpr for BitMapDistinct {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    /// the field of the final result of this aggregation.
+    fn field(&self) -> Result<Field> {
+        Ok(Field::new(&self.name, DataType::UInt64, false))
+    }
+
+    fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
+        let accumulator: Box<dyn Accumulator> = match &self.input_data_type {
+            DataType::UInt8
+            | DataType::UInt16
+            | DataType::UInt32
+            | DataType::Int8
+            | DataType::Int16
+            | DataType::Int32 => Box::new(BitmapDistinctCountAccumulator::try_new()),
+            other => {
+                return Err(DataFusionError::NotImplemented(format!(
+                    "Support for 'bitmap_distinct' for data type {} is not implemented",
+                    other
+                )))
+            }
+        };
+        Ok(accumulator)
+    }
+
+    fn state_fields(&self) -> Result<Vec<Field>> {
+        Ok(vec![Field::new(
+            &format_state_name(&self.name, "bitmap_registers"),
+            DataType::Binary,
+            false,
+        )])
+    }
+
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
+    }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
+}
+
+#[derive(Debug)]
+struct BitmapDistinctCountAccumulator {
+    bitmap: croaring::bitmap::Bitmap,
+}
+
+impl BitmapDistinctCountAccumulator {
+    fn try_new() -> Self {
+        Self {
+            bitmap: croaring::bitmap::Bitmap::create(),
+        }
+    }
+}
+
+impl Accumulator for BitmapDistinctCountAccumulator {
+    //state() can be used by physical nodes to aggregate states together and send them over the network/threads, to combine values.
+    fn state(&self) -> Result<Vec<ScalarValue>> {
+        //maybe run optimized
+        let buffer = self.bitmap.serialize();
+        Ok(vec![ScalarValue::Binary(Some(buffer))])
+    }
+
+    fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
+        let value = &values[0];
+        if value.is_empty() {
+            info!("BitmapDistinctCountAccumulator update_batch in empty batch");
+            return Ok(());
+        }
+        match value.data_type() {
+            DataType::Int8 => {
+                let array = value.as_any().downcast_ref::<Int8Array>().unwrap();
+                self.bitmap.add_many(
+                    &array
+                        .values()
+                        .iter()
+                        .map(|&x| x as u32)
+                        .collect::<Vec<u32>>(),
+                );
+            }
+            DataType::Int16 => {
+                let array = value.as_any().downcast_ref::<Int16Array>().unwrap();
+                self.bitmap.add_many(
+                    &array
+                        .values()
+                        .iter()
+                        .map(|&x| x as u32)
+                        .collect::<Vec<u32>>(),
+                );
+            }
+            DataType::Int32 => {
+                let array = value.as_any().downcast_ref::<Int32Array>().unwrap();
+                self.bitmap.add_many(
+                    &array
+                        .values()
+                        .iter()
+                        .map(|&x| x as u32)
+                        .collect::<Vec<u32>>(),
+                );
+            }
+            DataType::UInt8 => {
+                let array = value.as_any().downcast_ref::<UInt8Array>().unwrap();
+                self.bitmap.add_many(
+                    &array
+                        .values()
+                        .iter()
+                        .map(|&x| x as u32)
+                        .collect::<Vec<u32>>(),
+                );
+            }
+            DataType::UInt16 => {
+                let array = value.as_any().downcast_ref::<UInt16Array>().unwrap();
+                self.bitmap.add_many(
+                    &array
+                        .values()
+                        .iter()
+                        .map(|&x| x as u32)
+                        .collect::<Vec<u32>>(),
+                );
+            }
+            DataType::UInt32 => {
+                let array = value.as_any().downcast_ref::<UInt32Array>().unwrap();
+                self.bitmap.add_many(array.values());
+            }
+            e => {
+                return Err(DataFusionError::Internal(format!(
+                    "BITMAP_COUNT_DISTINCT is not expected to receive the type {:?}",
+                    e
+                )));
+            }
+        }
+        Ok(())
+    }
+
+    fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
+        assert_eq!(1, states.len(), "expect only 1 element in the states");
+        let binary_array = states[0].as_any().downcast_ref::<BinaryArray>().unwrap();
+        let bitmaps = binary_array
+            .iter()
+            .map(|x| {
+                Bitmap::deserialize(x.expect(
+                    "Impossibly got empty binary array from states in bitmap_distinct",
+                ))
+            })
+            .collect::<Vec<Bitmap>>();
+        let bitmaps = &bitmaps.iter().map(|x| x.borrow()).collect::<Vec<&Bitmap>>()[..];
+        //Do not use self.bitmap = xxx, because '=' has been wrote for 'BitAnd' !.
+        self.bitmap.bitor_assign(Bitmap::fast_or(bitmaps));
+        Ok(())

Review comment:
       Thanks, I used to use `or_inplace `
   copy from [cRoaring](https://github.com/RoaringBitmap/CRoaring#dealing-with-large-volumes)
   > Some users have to deal with large volumes of data. It may be important for these users to be aware of the addMany (C++) roaring_bitmap_or_many (C) functions as it is much faster and economical to add values in batches when possible. Furthermore, calling periodically the runOptimize (C++) or roaring_bitmap_run_optimize (C) functions may help.
   
   `fast_or ` bottom use `roaring_bitmap_or_many`,  but i tested  local, there are little difference between them, maybe i will test in large volumes data.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Ted-Jiang commented on pull request #1841: Implement bitmap_distinct function using roaring bitmap

Posted by GitBox <gi...@apache.org>.
Ted-Jiang commented on pull request #1841:
URL: https://github.com/apache/arrow-datafusion/pull/1841#issuecomment-1054067096


   @Jimexist @houqp  Have fixed the conflict. If there are no more issues here, could you please merge this to avoid conflict again😊.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Ted-Jiang removed a comment on pull request #1841: Implement bitmap_distinct function using roaring bitmap

Posted by GitBox <gi...@apache.org>.
Ted-Jiang removed a comment on pull request #1841:
URL: https://github.com/apache/arrow-datafusion/pull/1841#issuecomment-1053913138


   @Jimexist @houqp Have fixed the conflict. If there are no more issues here, could you please merge this to avoid any conflict.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Ted-Jiang commented on pull request #1841: Implement bitmap_distinct function using croaring-rs bitmap

Posted by GitBox <gi...@apache.org>.
Ted-Jiang commented on pull request #1841:
URL: https://github.com/apache/arrow-datafusion/pull/1841#issuecomment-1046497688


   @houqp @alamb change to the pure rust implementation bitmap. plz review😊


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Ted-Jiang commented on a change in pull request #1841: Implement bitmap_distinct function using roaring bitmap

Posted by GitBox <gi...@apache.org>.
Ted-Jiang commented on a change in pull request #1841:
URL: https://github.com/apache/arrow-datafusion/pull/1841#discussion_r811094933



##########
File path: datafusion/src/physical_plan/expressions/bitmap_distinct.rs
##########
@@ -0,0 +1,219 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query execution
+
+use std::any::Any;
+
+use std::fmt::Debug;
+use std::ops::BitOrAssign;
+use std::sync::Arc;
+
+use arrow::array::{
+    Array, ArrayRef, BinaryArray, Int16Array, Int32Array, Int8Array, UInt16Array,
+    UInt32Array, UInt8Array,
+};
+use arrow::datatypes::{DataType, Field};
+use log::error;
+use roaring::RoaringBitmap;
+
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::{Accumulator, AggregateExpr, PhysicalExpr};
+use crate::scalar::ScalarValue;
+
+use super::format_state_name;
+
+/// APPROX_DISTINCT aggregate expression
+#[derive(Debug)]
+pub struct BitMapDistinct {
+    name: String,
+    input_data_type: DataType,
+    expr: Arc<dyn PhysicalExpr>,
+}
+
+impl BitMapDistinct {
+    /// Create a new ApproxDistinct aggregate function.
+    pub fn new(
+        expr: Arc<dyn PhysicalExpr>,
+        name: impl Into<String>,
+        input_data_type: DataType,
+    ) -> Self {
+        Self {
+            name: name.into(),
+            input_data_type,
+            expr,
+        }
+    }
+}
+
+impl AggregateExpr for BitMapDistinct {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    /// the field of the final result of this aggregation.
+    fn field(&self) -> Result<Field> {
+        Ok(Field::new(&self.name, DataType::UInt64, false))
+    }
+
+    fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
+        let accumulator: Box<dyn Accumulator> = match &self.input_data_type {
+            DataType::UInt8
+            | DataType::UInt16
+            | DataType::UInt32
+            | DataType::Int8
+            | DataType::Int16
+            | DataType::Int32 => Box::new(BitmapDistinctCountAccumulator::try_new()),
+            other => {
+                return Err(DataFusionError::NotImplemented(format!(
+                    "Support for 'bitmap_distinct' for data type {} is not implemented",
+                    other
+                )))
+            }
+        };
+        Ok(accumulator)
+    }
+
+    fn state_fields(&self) -> Result<Vec<Field>> {
+        Ok(vec![Field::new(
+            &format_state_name(&self.name, "bitmap_registers"),
+            DataType::Binary,
+            false,
+        )])
+    }
+
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
+    }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
+}
+
+#[derive(Debug)]
+struct BitmapDistinctCountAccumulator {
+    bitmap: roaring::bitmap::RoaringBitmap,
+}
+
+impl BitmapDistinctCountAccumulator {
+    fn try_new() -> Self {
+        Self {
+            bitmap: RoaringBitmap::new(),
+        }
+    }
+}
+
+impl Accumulator for BitmapDistinctCountAccumulator {
+    //state() can be used by physical nodes to aggregate states together and send them over the network/threads, to combine values.
+    fn state(&self) -> Result<Vec<ScalarValue>> {
+        let mut bytes = vec![];
+        self.bitmap.serialize_into(&mut bytes).unwrap();
+        Ok(vec![ScalarValue::Binary(Some(bytes))])
+    }
+
+    fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
+        let value = &values[0];
+        if value.is_empty() {
+            return Ok(());
+        }
+        match value.data_type() {
+            DataType::Int8 => {
+                let array = value.as_any().downcast_ref::<Int8Array>().unwrap();
+                for i in 0..array.len() {
+                    self.bitmap.insert(array.value(i) as u32);
+                }
+            }
+            DataType::Int16 => {
+                let array = value.as_any().downcast_ref::<Int16Array>().unwrap();
+                for i in 0..array.len() {
+                    self.bitmap.insert(array.value(i) as u32);
+                }
+            }
+            DataType::Int32 => {
+                let array = value.as_any().downcast_ref::<Int32Array>().unwrap();
+                for i in 0..array.len() {
+                    self.bitmap.insert(array.value(i) as u32);
+                }
+            }
+            DataType::UInt8 => {
+                let array = value.as_any().downcast_ref::<UInt8Array>().unwrap();
+                for i in 0..array.len() {
+                    self.bitmap.insert(array.value(i) as u32);
+                }
+            }
+            DataType::UInt16 => {
+                let array = value.as_any().downcast_ref::<UInt16Array>().unwrap();
+                for i in 0..array.len() {
+                    self.bitmap.insert(array.value(i) as u32);
+                }
+            }
+            DataType::UInt32 => {
+                let array = value.as_any().downcast_ref::<UInt32Array>().unwrap();
+                for i in 0..array.len() {
+                    self.bitmap.insert(array.value(i));
+                }
+            }
+            e => {
+                return Err(DataFusionError::Internal(format!(
+                    "BITMAP_COUNT_DISTINCT is not expected to receive the type {:?}",
+                    e
+                )));
+            }
+        }
+        Ok(())
+    }
+
+    fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
+        if states.len() != 1 {
+            error!(
+                "expect only 1 element in the states but found {:?}",
+                &states.len()
+            )
+        }
+
+        let binary_array = states[0].as_any().downcast_ref::<BinaryArray>().unwrap();
+
+        for b in binary_array.iter() {
+            let v = b.ok_or_else(|| {
+                DataFusionError::Internal(
+                    "Impossibly got empty binary array from states".into(),
+                )
+            })?;
+            let bitmap = RoaringBitmap::deserialize_from(&v.to_vec()[..]).unwrap();
+            self.bitmap.bitor_assign(bitmap);
+        }
+        Ok(())
+    }
+
+    fn evaluate(&self) -> Result<ScalarValue> {
+        Ok(ScalarValue::from(self.bitmap.len()))
+    }
+}
+
+pub(crate) fn is_bitmap_count_distinct_supported_arg_type(arg_type: &DataType) -> bool {

Review comment:
       @liukun4515 AFAIK,  `roarling-rs` only support insert U32, I32. If we cast u64 to u32 may cause incorrect result. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Ted-Jiang commented on pull request #1841: Implement bitmap_distinct function using roaring bitmap

Posted by GitBox <gi...@apache.org>.
Ted-Jiang commented on pull request #1841:
URL: https://github.com/apache/arrow-datafusion/pull/1841#issuecomment-1053912928


   @Jimexist @houqp Have fixed the conflict. If there are no more issues here, could you please merge this to avoid any conflict.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] e-dard commented on a change in pull request #1841: Implement bitmap_distinct function using croaring-rs bitmap

Posted by GitBox <gi...@apache.org>.
e-dard commented on a change in pull request #1841:
URL: https://github.com/apache/arrow-datafusion/pull/1841#discussion_r807807819



##########
File path: datafusion/Cargo.toml
##########
@@ -81,6 +81,8 @@ num-traits = { version = "0.2", optional = true }
 pyo3 = { version = "0.15", optional = true }
 tempfile = "3"
 parking_lot = "0.12"
+#roaring = "0.8.1"

Review comment:
       Cruft?
   
   ```suggestion
   ```

##########
File path: datafusion/src/physical_plan/expressions/bitmap_distinct.rs
##########
@@ -0,0 +1,233 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query execution
+
+use std::any::Any;
+use std::borrow::Borrow;
+
+use std::fmt::Debug;
+use std::ops::BitOrAssign;
+use std::sync::Arc;
+
+use arrow::array::{
+    Array, ArrayRef, BinaryArray, Int16Array, Int32Array, Int8Array, UInt16Array,
+    UInt32Array, UInt8Array,
+};
+use arrow::datatypes::{DataType, Field};
+use croaring::Bitmap;
+use log::info;
+
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::{Accumulator, AggregateExpr, PhysicalExpr};
+use crate::scalar::ScalarValue;
+
+use super::format_state_name;
+
+#[derive(Debug)]
+pub struct BitMapDistinct {
+    name: String,
+    input_data_type: DataType,
+    expr: Arc<dyn PhysicalExpr>,
+}
+
+impl BitMapDistinct {
+    /// Create a new bitmapDistinct aggregate function.
+    pub fn new(
+        expr: Arc<dyn PhysicalExpr>,
+        name: impl Into<String>,
+        input_data_type: DataType,
+    ) -> Self {
+        Self {
+            name: name.into(),
+            input_data_type,
+            expr,
+        }
+    }
+}
+
+impl AggregateExpr for BitMapDistinct {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    /// the field of the final result of this aggregation.
+    fn field(&self) -> Result<Field> {
+        Ok(Field::new(&self.name, DataType::UInt64, false))
+    }
+
+    fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
+        let accumulator: Box<dyn Accumulator> = match &self.input_data_type {
+            DataType::UInt8
+            | DataType::UInt16
+            | DataType::UInt32
+            | DataType::Int8
+            | DataType::Int16
+            | DataType::Int32 => Box::new(BitmapDistinctCountAccumulator::try_new()),
+            other => {
+                return Err(DataFusionError::NotImplemented(format!(
+                    "Support for 'bitmap_distinct' for data type {} is not implemented",
+                    other
+                )))
+            }
+        };
+        Ok(accumulator)
+    }
+
+    fn state_fields(&self) -> Result<Vec<Field>> {
+        Ok(vec![Field::new(
+            &format_state_name(&self.name, "bitmap_registers"),
+            DataType::Binary,
+            false,
+        )])
+    }
+
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
+    }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
+}
+
+#[derive(Debug)]
+struct BitmapDistinctCountAccumulator {
+    bitmap: croaring::bitmap::Bitmap,
+}
+
+impl BitmapDistinctCountAccumulator {
+    fn try_new() -> Self {
+        Self {
+            bitmap: croaring::bitmap::Bitmap::create(),
+        }
+    }
+}
+
+impl Accumulator for BitmapDistinctCountAccumulator {
+    //state() can be used by physical nodes to aggregate states together and send them over the network/threads, to combine values.
+    fn state(&self) -> Result<Vec<ScalarValue>> {
+        //maybe run optimized
+        let buffer = self.bitmap.serialize();
+        Ok(vec![ScalarValue::Binary(Some(buffer))])
+    }
+
+    fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
+        let value = &values[0];
+        if value.is_empty() {
+            info!("BitmapDistinctCountAccumulator update_batch in empty batch");

Review comment:
       Does this need to be logged at `info` if it's not an error state?

##########
File path: datafusion/src/physical_plan/expressions/bitmap_distinct.rs
##########
@@ -0,0 +1,233 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query execution
+
+use std::any::Any;
+use std::borrow::Borrow;
+
+use std::fmt::Debug;
+use std::ops::BitOrAssign;
+use std::sync::Arc;
+
+use arrow::array::{
+    Array, ArrayRef, BinaryArray, Int16Array, Int32Array, Int8Array, UInt16Array,
+    UInt32Array, UInt8Array,
+};
+use arrow::datatypes::{DataType, Field};
+use croaring::Bitmap;
+use log::info;
+
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::{Accumulator, AggregateExpr, PhysicalExpr};
+use crate::scalar::ScalarValue;
+
+use super::format_state_name;
+
+#[derive(Debug)]
+pub struct BitMapDistinct {
+    name: String,
+    input_data_type: DataType,
+    expr: Arc<dyn PhysicalExpr>,
+}
+
+impl BitMapDistinct {
+    /// Create a new bitmapDistinct aggregate function.
+    pub fn new(
+        expr: Arc<dyn PhysicalExpr>,
+        name: impl Into<String>,
+        input_data_type: DataType,
+    ) -> Self {
+        Self {
+            name: name.into(),
+            input_data_type,
+            expr,
+        }
+    }
+}
+
+impl AggregateExpr for BitMapDistinct {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    /// the field of the final result of this aggregation.
+    fn field(&self) -> Result<Field> {
+        Ok(Field::new(&self.name, DataType::UInt64, false))
+    }
+
+    fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
+        let accumulator: Box<dyn Accumulator> = match &self.input_data_type {
+            DataType::UInt8
+            | DataType::UInt16
+            | DataType::UInt32
+            | DataType::Int8
+            | DataType::Int16
+            | DataType::Int32 => Box::new(BitmapDistinctCountAccumulator::try_new()),
+            other => {
+                return Err(DataFusionError::NotImplemented(format!(
+                    "Support for 'bitmap_distinct' for data type {} is not implemented",
+                    other
+                )))
+            }
+        };
+        Ok(accumulator)
+    }
+
+    fn state_fields(&self) -> Result<Vec<Field>> {
+        Ok(vec![Field::new(
+            &format_state_name(&self.name, "bitmap_registers"),
+            DataType::Binary,
+            false,
+        )])
+    }
+
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
+    }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
+}
+
+#[derive(Debug)]
+struct BitmapDistinctCountAccumulator {
+    bitmap: croaring::bitmap::Bitmap,
+}
+
+impl BitmapDistinctCountAccumulator {
+    fn try_new() -> Self {
+        Self {
+            bitmap: croaring::bitmap::Bitmap::create(),
+        }
+    }
+}
+
+impl Accumulator for BitmapDistinctCountAccumulator {
+    //state() can be used by physical nodes to aggregate states together and send them over the network/threads, to combine values.
+    fn state(&self) -> Result<Vec<ScalarValue>> {
+        //maybe run optimized
+        let buffer = self.bitmap.serialize();
+        Ok(vec![ScalarValue::Binary(Some(buffer))])
+    }
+
+    fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
+        let value = &values[0];
+        if value.is_empty() {
+            info!("BitmapDistinctCountAccumulator update_batch in empty batch");
+            return Ok(());
+        }
+        match value.data_type() {
+            DataType::Int8 => {
+                let array = value.as_any().downcast_ref::<Int8Array>().unwrap();
+                self.bitmap.add_many(
+                    &array
+                        .values()
+                        .iter()
+                        .map(|&x| x as u32)
+                        .collect::<Vec<u32>>(),
+                );
+            }
+            DataType::Int16 => {
+                let array = value.as_any().downcast_ref::<Int16Array>().unwrap();
+                self.bitmap.add_many(
+                    &array
+                        .values()
+                        .iter()
+                        .map(|&x| x as u32)
+                        .collect::<Vec<u32>>(),
+                );
+            }
+            DataType::Int32 => {
+                let array = value.as_any().downcast_ref::<Int32Array>().unwrap();
+                self.bitmap.add_many(
+                    &array
+                        .values()
+                        .iter()
+                        .map(|&x| x as u32)
+                        .collect::<Vec<u32>>(),
+                );
+            }
+            DataType::UInt8 => {
+                let array = value.as_any().downcast_ref::<UInt8Array>().unwrap();
+                self.bitmap.add_many(
+                    &array
+                        .values()
+                        .iter()
+                        .map(|&x| x as u32)
+                        .collect::<Vec<u32>>(),
+                );
+            }
+            DataType::UInt16 => {
+                let array = value.as_any().downcast_ref::<UInt16Array>().unwrap();
+                self.bitmap.add_many(
+                    &array
+                        .values()
+                        .iter()
+                        .map(|&x| x as u32)
+                        .collect::<Vec<u32>>(),
+                );
+            }
+            DataType::UInt32 => {
+                let array = value.as_any().downcast_ref::<UInt32Array>().unwrap();
+                self.bitmap.add_many(array.values());
+            }
+            e => {
+                return Err(DataFusionError::Internal(format!(
+                    "BITMAP_COUNT_DISTINCT is not expected to receive the type {:?}",
+                    e
+                )));
+            }
+        }
+        Ok(())
+    }
+
+    fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
+        assert_eq!(1, states.len(), "expect only 1 element in the states");
+        let binary_array = states[0].as_any().downcast_ref::<BinaryArray>().unwrap();
+        let bitmaps = binary_array
+            .iter()
+            .map(|x| {
+                Bitmap::deserialize(x.expect(
+                    "Impossibly got empty binary array from states in bitmap_distinct",
+                ))
+            })
+            .collect::<Vec<Bitmap>>();
+        let bitmaps = &bitmaps.iter().map(|x| x.borrow()).collect::<Vec<&Bitmap>>()[..];
+        //Do not use self.bitmap = xxx, because '=' has been wrote for 'BitAnd' !.
+        self.bitmap.bitor_assign(Bitmap::fast_or(bitmaps));
+        Ok(())
+    }
+
+    fn evaluate(&self) -> Result<ScalarValue> {

Review comment:
       I assume this generally only gets called once per query? Only asking because `cardinality` is quite an expensive call.

##########
File path: datafusion/src/physical_plan/expressions/bitmap_distinct.rs
##########
@@ -0,0 +1,233 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query execution
+
+use std::any::Any;
+use std::borrow::Borrow;
+
+use std::fmt::Debug;
+use std::ops::BitOrAssign;
+use std::sync::Arc;
+
+use arrow::array::{
+    Array, ArrayRef, BinaryArray, Int16Array, Int32Array, Int8Array, UInt16Array,
+    UInt32Array, UInt8Array,
+};
+use arrow::datatypes::{DataType, Field};
+use croaring::Bitmap;
+use log::info;
+
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::{Accumulator, AggregateExpr, PhysicalExpr};
+use crate::scalar::ScalarValue;
+
+use super::format_state_name;
+
+#[derive(Debug)]
+pub struct BitMapDistinct {
+    name: String,
+    input_data_type: DataType,
+    expr: Arc<dyn PhysicalExpr>,
+}
+
+impl BitMapDistinct {
+    /// Create a new bitmapDistinct aggregate function.
+    pub fn new(
+        expr: Arc<dyn PhysicalExpr>,
+        name: impl Into<String>,
+        input_data_type: DataType,
+    ) -> Self {
+        Self {
+            name: name.into(),
+            input_data_type,
+            expr,
+        }
+    }
+}
+
+impl AggregateExpr for BitMapDistinct {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    /// the field of the final result of this aggregation.
+    fn field(&self) -> Result<Field> {
+        Ok(Field::new(&self.name, DataType::UInt64, false))
+    }
+
+    fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
+        let accumulator: Box<dyn Accumulator> = match &self.input_data_type {
+            DataType::UInt8
+            | DataType::UInt16
+            | DataType::UInt32
+            | DataType::Int8
+            | DataType::Int16
+            | DataType::Int32 => Box::new(BitmapDistinctCountAccumulator::try_new()),
+            other => {
+                return Err(DataFusionError::NotImplemented(format!(
+                    "Support for 'bitmap_distinct' for data type {} is not implemented",
+                    other
+                )))
+            }
+        };
+        Ok(accumulator)
+    }
+
+    fn state_fields(&self) -> Result<Vec<Field>> {
+        Ok(vec![Field::new(
+            &format_state_name(&self.name, "bitmap_registers"),
+            DataType::Binary,
+            false,
+        )])
+    }
+
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
+    }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
+}
+
+#[derive(Debug)]
+struct BitmapDistinctCountAccumulator {
+    bitmap: croaring::bitmap::Bitmap,
+}
+
+impl BitmapDistinctCountAccumulator {
+    fn try_new() -> Self {
+        Self {
+            bitmap: croaring::bitmap::Bitmap::create(),
+        }
+    }
+}
+
+impl Accumulator for BitmapDistinctCountAccumulator {
+    //state() can be used by physical nodes to aggregate states together and send them over the network/threads, to combine values.
+    fn state(&self) -> Result<Vec<ScalarValue>> {
+        //maybe run optimized
+        let buffer = self.bitmap.serialize();
+        Ok(vec![ScalarValue::Binary(Some(buffer))])
+    }
+
+    fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
+        let value = &values[0];
+        if value.is_empty() {
+            info!("BitmapDistinctCountAccumulator update_batch in empty batch");
+            return Ok(());
+        }
+        match value.data_type() {
+            DataType::Int8 => {
+                let array = value.as_any().downcast_ref::<Int8Array>().unwrap();
+                self.bitmap.add_many(
+                    &array
+                        .values()
+                        .iter()
+                        .map(|&x| x as u32)
+                        .collect::<Vec<u32>>(),
+                );
+            }
+            DataType::Int16 => {
+                let array = value.as_any().downcast_ref::<Int16Array>().unwrap();
+                self.bitmap.add_many(
+                    &array
+                        .values()
+                        .iter()
+                        .map(|&x| x as u32)
+                        .collect::<Vec<u32>>(),
+                );
+            }
+            DataType::Int32 => {
+                let array = value.as_any().downcast_ref::<Int32Array>().unwrap();
+                self.bitmap.add_many(
+                    &array
+                        .values()
+                        .iter()
+                        .map(|&x| x as u32)
+                        .collect::<Vec<u32>>(),
+                );
+            }
+            DataType::UInt8 => {
+                let array = value.as_any().downcast_ref::<UInt8Array>().unwrap();
+                self.bitmap.add_many(
+                    &array
+                        .values()
+                        .iter()
+                        .map(|&x| x as u32)
+                        .collect::<Vec<u32>>(),
+                );
+            }
+            DataType::UInt16 => {
+                let array = value.as_any().downcast_ref::<UInt16Array>().unwrap();
+                self.bitmap.add_many(
+                    &array
+                        .values()
+                        .iter()
+                        .map(|&x| x as u32)
+                        .collect::<Vec<u32>>(),
+                );
+            }
+            DataType::UInt32 => {
+                let array = value.as_any().downcast_ref::<UInt32Array>().unwrap();
+                self.bitmap.add_many(array.values());
+            }
+            e => {
+                return Err(DataFusionError::Internal(format!(
+                    "BITMAP_COUNT_DISTINCT is not expected to receive the type {:?}",
+                    e
+                )));
+            }
+        }
+        Ok(())
+    }
+
+    fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
+        assert_eq!(1, states.len(), "expect only 1 element in the states");

Review comment:
       You have the option to return an error here. Is that perhaps preferable than a hard crash? Maybe higher up the call chain some more context could be added to help debugging.

##########
File path: datafusion/src/physical_plan/expressions/bitmap_distinct.rs
##########
@@ -0,0 +1,233 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query execution
+
+use std::any::Any;
+use std::borrow::Borrow;
+
+use std::fmt::Debug;
+use std::ops::BitOrAssign;
+use std::sync::Arc;
+
+use arrow::array::{
+    Array, ArrayRef, BinaryArray, Int16Array, Int32Array, Int8Array, UInt16Array,
+    UInt32Array, UInt8Array,
+};
+use arrow::datatypes::{DataType, Field};
+use croaring::Bitmap;
+use log::info;
+
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::{Accumulator, AggregateExpr, PhysicalExpr};
+use crate::scalar::ScalarValue;
+
+use super::format_state_name;
+
+#[derive(Debug)]
+pub struct BitMapDistinct {
+    name: String,
+    input_data_type: DataType,
+    expr: Arc<dyn PhysicalExpr>,
+}
+
+impl BitMapDistinct {
+    /// Create a new bitmapDistinct aggregate function.
+    pub fn new(
+        expr: Arc<dyn PhysicalExpr>,
+        name: impl Into<String>,
+        input_data_type: DataType,
+    ) -> Self {
+        Self {
+            name: name.into(),
+            input_data_type,
+            expr,
+        }
+    }
+}
+
+impl AggregateExpr for BitMapDistinct {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    /// the field of the final result of this aggregation.
+    fn field(&self) -> Result<Field> {
+        Ok(Field::new(&self.name, DataType::UInt64, false))
+    }
+
+    fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
+        let accumulator: Box<dyn Accumulator> = match &self.input_data_type {
+            DataType::UInt8
+            | DataType::UInt16
+            | DataType::UInt32
+            | DataType::Int8
+            | DataType::Int16
+            | DataType::Int32 => Box::new(BitmapDistinctCountAccumulator::try_new()),
+            other => {
+                return Err(DataFusionError::NotImplemented(format!(
+                    "Support for 'bitmap_distinct' for data type {} is not implemented",
+                    other
+                )))
+            }
+        };
+        Ok(accumulator)
+    }
+
+    fn state_fields(&self) -> Result<Vec<Field>> {
+        Ok(vec![Field::new(
+            &format_state_name(&self.name, "bitmap_registers"),
+            DataType::Binary,
+            false,
+        )])
+    }
+
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
+    }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
+}
+
+#[derive(Debug)]
+struct BitmapDistinctCountAccumulator {
+    bitmap: croaring::bitmap::Bitmap,
+}
+
+impl BitmapDistinctCountAccumulator {
+    fn try_new() -> Self {
+        Self {
+            bitmap: croaring::bitmap::Bitmap::create(),
+        }
+    }
+}
+
+impl Accumulator for BitmapDistinctCountAccumulator {
+    //state() can be used by physical nodes to aggregate states together and send them over the network/threads, to combine values.
+    fn state(&self) -> Result<Vec<ScalarValue>> {
+        //maybe run optimized

Review comment:
       I think this is a bit of a tricky question because the bitmap only lives for the duration of the query right? One of the main things that can be optimised is to convert array containers into run containers (which use RLE compression), so it seems likely that it depends on the contents of the input. A benchmark with typical data might help answer this.

##########
File path: datafusion/src/physical_plan/expressions/bitmap_distinct.rs
##########
@@ -0,0 +1,233 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query execution
+
+use std::any::Any;
+use std::borrow::Borrow;
+
+use std::fmt::Debug;
+use std::ops::BitOrAssign;
+use std::sync::Arc;
+
+use arrow::array::{
+    Array, ArrayRef, BinaryArray, Int16Array, Int32Array, Int8Array, UInt16Array,
+    UInt32Array, UInt8Array,
+};
+use arrow::datatypes::{DataType, Field};
+use croaring::Bitmap;
+use log::info;
+
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::{Accumulator, AggregateExpr, PhysicalExpr};
+use crate::scalar::ScalarValue;
+
+use super::format_state_name;
+
+#[derive(Debug)]
+pub struct BitMapDistinct {
+    name: String,
+    input_data_type: DataType,
+    expr: Arc<dyn PhysicalExpr>,
+}
+
+impl BitMapDistinct {
+    /// Create a new bitmapDistinct aggregate function.
+    pub fn new(
+        expr: Arc<dyn PhysicalExpr>,
+        name: impl Into<String>,
+        input_data_type: DataType,
+    ) -> Self {
+        Self {
+            name: name.into(),
+            input_data_type,
+            expr,
+        }
+    }
+}
+
+impl AggregateExpr for BitMapDistinct {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    /// the field of the final result of this aggregation.
+    fn field(&self) -> Result<Field> {
+        Ok(Field::new(&self.name, DataType::UInt64, false))
+    }
+
+    fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
+        let accumulator: Box<dyn Accumulator> = match &self.input_data_type {
+            DataType::UInt8
+            | DataType::UInt16
+            | DataType::UInt32
+            | DataType::Int8
+            | DataType::Int16
+            | DataType::Int32 => Box::new(BitmapDistinctCountAccumulator::try_new()),
+            other => {
+                return Err(DataFusionError::NotImplemented(format!(
+                    "Support for 'bitmap_distinct' for data type {} is not implemented",
+                    other
+                )))
+            }
+        };
+        Ok(accumulator)
+    }
+
+    fn state_fields(&self) -> Result<Vec<Field>> {
+        Ok(vec![Field::new(
+            &format_state_name(&self.name, "bitmap_registers"),
+            DataType::Binary,
+            false,
+        )])
+    }
+
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
+    }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
+}
+
+#[derive(Debug)]
+struct BitmapDistinctCountAccumulator {
+    bitmap: croaring::bitmap::Bitmap,
+}
+
+impl BitmapDistinctCountAccumulator {
+    fn try_new() -> Self {
+        Self {
+            bitmap: croaring::bitmap::Bitmap::create(),
+        }
+    }
+}
+
+impl Accumulator for BitmapDistinctCountAccumulator {
+    //state() can be used by physical nodes to aggregate states together and send them over the network/threads, to combine values.
+    fn state(&self) -> Result<Vec<ScalarValue>> {
+        //maybe run optimized
+        let buffer = self.bitmap.serialize();
+        Ok(vec![ScalarValue::Binary(Some(buffer))])
+    }
+
+    fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
+        let value = &values[0];
+        if value.is_empty() {
+            info!("BitmapDistinctCountAccumulator update_batch in empty batch");
+            return Ok(());
+        }
+        match value.data_type() {
+            DataType::Int8 => {
+                let array = value.as_any().downcast_ref::<Int8Array>().unwrap();
+                self.bitmap.add_many(
+                    &array
+                        .values()
+                        .iter()
+                        .map(|&x| x as u32)
+                        .collect::<Vec<u32>>(),
+                );
+            }
+            DataType::Int16 => {
+                let array = value.as_any().downcast_ref::<Int16Array>().unwrap();
+                self.bitmap.add_many(
+                    &array
+                        .values()
+                        .iter()
+                        .map(|&x| x as u32)
+                        .collect::<Vec<u32>>(),
+                );
+            }
+            DataType::Int32 => {
+                let array = value.as_any().downcast_ref::<Int32Array>().unwrap();
+                self.bitmap.add_many(
+                    &array
+                        .values()
+                        .iter()
+                        .map(|&x| x as u32)
+                        .collect::<Vec<u32>>(),
+                );
+            }
+            DataType::UInt8 => {
+                let array = value.as_any().downcast_ref::<UInt8Array>().unwrap();
+                self.bitmap.add_many(
+                    &array
+                        .values()
+                        .iter()
+                        .map(|&x| x as u32)
+                        .collect::<Vec<u32>>(),
+                );
+            }
+            DataType::UInt16 => {
+                let array = value.as_any().downcast_ref::<UInt16Array>().unwrap();
+                self.bitmap.add_many(
+                    &array
+                        .values()
+                        .iter()
+                        .map(|&x| x as u32)
+                        .collect::<Vec<u32>>(),
+                );
+            }
+            DataType::UInt32 => {
+                let array = value.as_any().downcast_ref::<UInt32Array>().unwrap();
+                self.bitmap.add_many(array.values());
+            }
+            e => {
+                return Err(DataFusionError::Internal(format!(
+                    "BITMAP_COUNT_DISTINCT is not expected to receive the type {:?}",
+                    e
+                )));
+            }
+        }
+        Ok(())
+    }
+
+    fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
+        assert_eq!(1, states.len(), "expect only 1 element in the states");
+        let binary_array = states[0].as_any().downcast_ref::<BinaryArray>().unwrap();
+        let bitmaps = binary_array
+            .iter()
+            .map(|x| {
+                Bitmap::deserialize(x.expect(
+                    "Impossibly got empty binary array from states in bitmap_distinct",
+                ))
+            })
+            .collect::<Vec<Bitmap>>();
+        let bitmaps = &bitmaps.iter().map(|x| x.borrow()).collect::<Vec<&Bitmap>>()[..];
+        //Do not use self.bitmap = xxx, because '=' has been wrote for 'BitAnd' !.
+        self.bitmap.bitor_assign(Bitmap::fast_or(bitmaps));
+        Ok(())

Review comment:
       I might be way off here (sorry if so), but when I look at this I wonder if you can simplify it to this:
   
   ```suggestion
           for data in binary_array.iter().flatten() {
               self.bitmap.or_inplace(&Bitmap::deserialize(data));
           }
           Ok(())
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Ted-Jiang commented on pull request #1841: Implement bitmap_distinct function using croaring-rs bitmap

Posted by GitBox <gi...@apache.org>.
Ted-Jiang commented on pull request #1841:
URL: https://github.com/apache/arrow-datafusion/pull/1841#issuecomment-1042631149


   
   > * I wonder if/how this gets things closer to being able to do distinct on compressed data (in DF's case on dictionary encoded columns). The problem (as I understand it) is that there is no guarantee that Arrow dictionaries have the same encoded representation for a value across batches, or even in the same record batch (if I remember how dictionary concatenation currently works in Arrow).
   
   `There is no guarantee that Arrow dictionaries have the same encoded representation for a value across batches` : yes
   We plan to maintain a global dictionary to encode col(string) into 32-bit int to accelerate count distinct.
   
   > * Would this work on 64-bit columns if they could first be casted to 32-bit? That is, assuming the contents of the 64-bit column actually fit as 32-bit unsigned integers?
   IMO, it will lose front 32 bit info, the result will be incorrect.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Ted-Jiang commented on a change in pull request #1841: Implement bitmap_distinct function using croaring-rs bitmap

Posted by GitBox <gi...@apache.org>.
Ted-Jiang commented on a change in pull request #1841:
URL: https://github.com/apache/arrow-datafusion/pull/1841#discussion_r808701903



##########
File path: datafusion/src/physical_plan/expressions/bitmap_distinct.rs
##########
@@ -0,0 +1,233 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query execution
+
+use std::any::Any;
+use std::borrow::Borrow;
+
+use std::fmt::Debug;
+use std::ops::BitOrAssign;
+use std::sync::Arc;
+
+use arrow::array::{
+    Array, ArrayRef, BinaryArray, Int16Array, Int32Array, Int8Array, UInt16Array,
+    UInt32Array, UInt8Array,
+};
+use arrow::datatypes::{DataType, Field};
+use croaring::Bitmap;
+use log::info;
+
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::{Accumulator, AggregateExpr, PhysicalExpr};
+use crate::scalar::ScalarValue;
+
+use super::format_state_name;
+
+#[derive(Debug)]
+pub struct BitMapDistinct {
+    name: String,
+    input_data_type: DataType,
+    expr: Arc<dyn PhysicalExpr>,
+}
+
+impl BitMapDistinct {
+    /// Create a new bitmapDistinct aggregate function.
+    pub fn new(
+        expr: Arc<dyn PhysicalExpr>,
+        name: impl Into<String>,
+        input_data_type: DataType,
+    ) -> Self {
+        Self {
+            name: name.into(),
+            input_data_type,
+            expr,
+        }
+    }
+}
+
+impl AggregateExpr for BitMapDistinct {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    /// the field of the final result of this aggregation.
+    fn field(&self) -> Result<Field> {
+        Ok(Field::new(&self.name, DataType::UInt64, false))
+    }
+
+    fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
+        let accumulator: Box<dyn Accumulator> = match &self.input_data_type {
+            DataType::UInt8
+            | DataType::UInt16
+            | DataType::UInt32
+            | DataType::Int8
+            | DataType::Int16
+            | DataType::Int32 => Box::new(BitmapDistinctCountAccumulator::try_new()),
+            other => {
+                return Err(DataFusionError::NotImplemented(format!(
+                    "Support for 'bitmap_distinct' for data type {} is not implemented",
+                    other
+                )))
+            }
+        };
+        Ok(accumulator)
+    }
+
+    fn state_fields(&self) -> Result<Vec<Field>> {
+        Ok(vec![Field::new(
+            &format_state_name(&self.name, "bitmap_registers"),
+            DataType::Binary,
+            false,
+        )])
+    }
+
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
+    }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
+}
+
+#[derive(Debug)]
+struct BitmapDistinctCountAccumulator {
+    bitmap: croaring::bitmap::Bitmap,
+}
+
+impl BitmapDistinctCountAccumulator {
+    fn try_new() -> Self {
+        Self {
+            bitmap: croaring::bitmap::Bitmap::create(),
+        }
+    }
+}
+
+impl Accumulator for BitmapDistinctCountAccumulator {
+    //state() can be used by physical nodes to aggregate states together and send them over the network/threads, to combine values.
+    fn state(&self) -> Result<Vec<ScalarValue>> {
+        //maybe run optimized

Review comment:
       IMHO, when meet large amount data, run optimized may extremely reduce shuffle data between thread or process(ballista), it may reduce IO cost.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Ted-Jiang commented on a change in pull request #1841: Implement bitmap_distinct function using croaring-rs bitmap

Posted by GitBox <gi...@apache.org>.
Ted-Jiang commented on a change in pull request #1841:
URL: https://github.com/apache/arrow-datafusion/pull/1841#discussion_r808705218



##########
File path: datafusion/src/physical_plan/expressions/bitmap_distinct.rs
##########
@@ -0,0 +1,233 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query execution
+
+use std::any::Any;
+use std::borrow::Borrow;
+
+use std::fmt::Debug;
+use std::ops::BitOrAssign;
+use std::sync::Arc;
+
+use arrow::array::{
+    Array, ArrayRef, BinaryArray, Int16Array, Int32Array, Int8Array, UInt16Array,
+    UInt32Array, UInt8Array,
+};
+use arrow::datatypes::{DataType, Field};
+use croaring::Bitmap;
+use log::info;
+
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::{Accumulator, AggregateExpr, PhysicalExpr};
+use crate::scalar::ScalarValue;
+
+use super::format_state_name;
+
+#[derive(Debug)]
+pub struct BitMapDistinct {
+    name: String,
+    input_data_type: DataType,
+    expr: Arc<dyn PhysicalExpr>,
+}
+
+impl BitMapDistinct {
+    /// Create a new bitmapDistinct aggregate function.
+    pub fn new(
+        expr: Arc<dyn PhysicalExpr>,
+        name: impl Into<String>,
+        input_data_type: DataType,
+    ) -> Self {
+        Self {
+            name: name.into(),
+            input_data_type,
+            expr,
+        }
+    }
+}
+
+impl AggregateExpr for BitMapDistinct {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    /// the field of the final result of this aggregation.
+    fn field(&self) -> Result<Field> {
+        Ok(Field::new(&self.name, DataType::UInt64, false))
+    }
+
+    fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
+        let accumulator: Box<dyn Accumulator> = match &self.input_data_type {
+            DataType::UInt8
+            | DataType::UInt16
+            | DataType::UInt32
+            | DataType::Int8
+            | DataType::Int16
+            | DataType::Int32 => Box::new(BitmapDistinctCountAccumulator::try_new()),
+            other => {
+                return Err(DataFusionError::NotImplemented(format!(
+                    "Support for 'bitmap_distinct' for data type {} is not implemented",
+                    other
+                )))
+            }
+        };
+        Ok(accumulator)
+    }
+
+    fn state_fields(&self) -> Result<Vec<Field>> {
+        Ok(vec![Field::new(
+            &format_state_name(&self.name, "bitmap_registers"),
+            DataType::Binary,
+            false,
+        )])
+    }
+
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
+    }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
+}
+
+#[derive(Debug)]
+struct BitmapDistinctCountAccumulator {
+    bitmap: croaring::bitmap::Bitmap,
+}
+
+impl BitmapDistinctCountAccumulator {
+    fn try_new() -> Self {
+        Self {
+            bitmap: croaring::bitmap::Bitmap::create(),
+        }
+    }
+}
+
+impl Accumulator for BitmapDistinctCountAccumulator {
+    //state() can be used by physical nodes to aggregate states together and send them over the network/threads, to combine values.
+    fn state(&self) -> Result<Vec<ScalarValue>> {
+        //maybe run optimized
+        let buffer = self.bitmap.serialize();
+        Ok(vec![ScalarValue::Binary(Some(buffer))])
+    }
+
+    fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
+        let value = &values[0];
+        if value.is_empty() {
+            info!("BitmapDistinctCountAccumulator update_batch in empty batch");
+            return Ok(());
+        }
+        match value.data_type() {
+            DataType::Int8 => {
+                let array = value.as_any().downcast_ref::<Int8Array>().unwrap();
+                self.bitmap.add_many(
+                    &array
+                        .values()
+                        .iter()
+                        .map(|&x| x as u32)
+                        .collect::<Vec<u32>>(),
+                );
+            }
+            DataType::Int16 => {
+                let array = value.as_any().downcast_ref::<Int16Array>().unwrap();
+                self.bitmap.add_many(
+                    &array
+                        .values()
+                        .iter()
+                        .map(|&x| x as u32)
+                        .collect::<Vec<u32>>(),
+                );
+            }
+            DataType::Int32 => {
+                let array = value.as_any().downcast_ref::<Int32Array>().unwrap();
+                self.bitmap.add_many(
+                    &array
+                        .values()
+                        .iter()
+                        .map(|&x| x as u32)
+                        .collect::<Vec<u32>>(),
+                );
+            }
+            DataType::UInt8 => {
+                let array = value.as_any().downcast_ref::<UInt8Array>().unwrap();
+                self.bitmap.add_many(
+                    &array
+                        .values()
+                        .iter()
+                        .map(|&x| x as u32)
+                        .collect::<Vec<u32>>(),
+                );
+            }
+            DataType::UInt16 => {
+                let array = value.as_any().downcast_ref::<UInt16Array>().unwrap();
+                self.bitmap.add_many(
+                    &array
+                        .values()
+                        .iter()
+                        .map(|&x| x as u32)
+                        .collect::<Vec<u32>>(),
+                );
+            }
+            DataType::UInt32 => {
+                let array = value.as_any().downcast_ref::<UInt32Array>().unwrap();
+                self.bitmap.add_many(array.values());
+            }
+            e => {
+                return Err(DataFusionError::Internal(format!(
+                    "BITMAP_COUNT_DISTINCT is not expected to receive the type {:?}",
+                    e
+                )));
+            }
+        }
+        Ok(())
+    }
+
+    fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
+        assert_eq!(1, states.len(), "expect only 1 element in the states");

Review comment:
       Thanks, in bitmap_distinct `merge_batch` will get input from `state`, `state` will only return one single array of bitmap, i add it to emphasize it.  
   Yes, i should log a error rather than panic👍 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Ted-Jiang commented on a change in pull request #1841: Implement bitmap_distinct function using croaring-rs bitmap

Posted by GitBox <gi...@apache.org>.
Ted-Jiang commented on a change in pull request #1841:
URL: https://github.com/apache/arrow-datafusion/pull/1841#discussion_r808702220



##########
File path: datafusion/src/physical_plan/expressions/bitmap_distinct.rs
##########
@@ -0,0 +1,233 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query execution
+
+use std::any::Any;
+use std::borrow::Borrow;
+
+use std::fmt::Debug;
+use std::ops::BitOrAssign;
+use std::sync::Arc;
+
+use arrow::array::{
+    Array, ArrayRef, BinaryArray, Int16Array, Int32Array, Int8Array, UInt16Array,
+    UInt32Array, UInt8Array,
+};
+use arrow::datatypes::{DataType, Field};
+use croaring::Bitmap;
+use log::info;
+
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::{Accumulator, AggregateExpr, PhysicalExpr};
+use crate::scalar::ScalarValue;
+
+use super::format_state_name;
+
+#[derive(Debug)]
+pub struct BitMapDistinct {
+    name: String,
+    input_data_type: DataType,
+    expr: Arc<dyn PhysicalExpr>,
+}
+
+impl BitMapDistinct {
+    /// Create a new bitmapDistinct aggregate function.
+    pub fn new(
+        expr: Arc<dyn PhysicalExpr>,
+        name: impl Into<String>,
+        input_data_type: DataType,
+    ) -> Self {
+        Self {
+            name: name.into(),
+            input_data_type,
+            expr,
+        }
+    }
+}
+
+impl AggregateExpr for BitMapDistinct {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    /// the field of the final result of this aggregation.
+    fn field(&self) -> Result<Field> {
+        Ok(Field::new(&self.name, DataType::UInt64, false))
+    }
+
+    fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
+        let accumulator: Box<dyn Accumulator> = match &self.input_data_type {
+            DataType::UInt8
+            | DataType::UInt16
+            | DataType::UInt32
+            | DataType::Int8
+            | DataType::Int16
+            | DataType::Int32 => Box::new(BitmapDistinctCountAccumulator::try_new()),
+            other => {
+                return Err(DataFusionError::NotImplemented(format!(
+                    "Support for 'bitmap_distinct' for data type {} is not implemented",
+                    other
+                )))
+            }
+        };
+        Ok(accumulator)
+    }
+
+    fn state_fields(&self) -> Result<Vec<Field>> {
+        Ok(vec![Field::new(
+            &format_state_name(&self.name, "bitmap_registers"),
+            DataType::Binary,
+            false,
+        )])
+    }
+
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
+    }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
+}
+
+#[derive(Debug)]
+struct BitmapDistinctCountAccumulator {
+    bitmap: croaring::bitmap::Bitmap,
+}
+
+impl BitmapDistinctCountAccumulator {
+    fn try_new() -> Self {
+        Self {
+            bitmap: croaring::bitmap::Bitmap::create(),
+        }
+    }
+}
+
+impl Accumulator for BitmapDistinctCountAccumulator {
+    //state() can be used by physical nodes to aggregate states together and send them over the network/threads, to combine values.
+    fn state(&self) -> Result<Vec<ScalarValue>> {
+        //maybe run optimized

Review comment:
       Should we check the map size then decide whether run optimized




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Ted-Jiang removed a comment on pull request #1841: Implement bitmap_distinct function using roaring bitmap

Posted by GitBox <gi...@apache.org>.
Ted-Jiang removed a comment on pull request #1841:
URL: https://github.com/apache/arrow-datafusion/pull/1841#issuecomment-1053912928


   @Jimexist @houqp Have fixed the conflict. If there are no more issues here, could you please merge this to avoid any conflict.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] houqp commented on a change in pull request #1841: Implement bitmap_distinct function using croaring-rs bitmap

Posted by GitBox <gi...@apache.org>.
houqp commented on a change in pull request #1841:
URL: https://github.com/apache/arrow-datafusion/pull/1841#discussion_r810708597



##########
File path: datafusion/Cargo.toml
##########
@@ -81,6 +81,7 @@ num-traits = { version = "0.2", optional = true }
 pyo3 = { version = "0.15", optional = true }
 tempfile = "3"
 parking_lot = "0.12"
+croaring = "0.5.1"

Review comment:
       I agree with @alamb , would be good if we can either make this dependency optional or change to a pure rust implementation.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Ted-Jiang commented on a change in pull request #1841: Implement bitmap_distinct function using croaring-rs bitmap

Posted by GitBox <gi...@apache.org>.
Ted-Jiang commented on a change in pull request #1841:
URL: https://github.com/apache/arrow-datafusion/pull/1841#discussion_r808701903



##########
File path: datafusion/src/physical_plan/expressions/bitmap_distinct.rs
##########
@@ -0,0 +1,233 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query execution
+
+use std::any::Any;
+use std::borrow::Borrow;
+
+use std::fmt::Debug;
+use std::ops::BitOrAssign;
+use std::sync::Arc;
+
+use arrow::array::{
+    Array, ArrayRef, BinaryArray, Int16Array, Int32Array, Int8Array, UInt16Array,
+    UInt32Array, UInt8Array,
+};
+use arrow::datatypes::{DataType, Field};
+use croaring::Bitmap;
+use log::info;
+
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::{Accumulator, AggregateExpr, PhysicalExpr};
+use crate::scalar::ScalarValue;
+
+use super::format_state_name;
+
+#[derive(Debug)]
+pub struct BitMapDistinct {
+    name: String,
+    input_data_type: DataType,
+    expr: Arc<dyn PhysicalExpr>,
+}
+
+impl BitMapDistinct {
+    /// Create a new bitmapDistinct aggregate function.
+    pub fn new(
+        expr: Arc<dyn PhysicalExpr>,
+        name: impl Into<String>,
+        input_data_type: DataType,
+    ) -> Self {
+        Self {
+            name: name.into(),
+            input_data_type,
+            expr,
+        }
+    }
+}
+
+impl AggregateExpr for BitMapDistinct {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    /// the field of the final result of this aggregation.
+    fn field(&self) -> Result<Field> {
+        Ok(Field::new(&self.name, DataType::UInt64, false))
+    }
+
+    fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
+        let accumulator: Box<dyn Accumulator> = match &self.input_data_type {
+            DataType::UInt8
+            | DataType::UInt16
+            | DataType::UInt32
+            | DataType::Int8
+            | DataType::Int16
+            | DataType::Int32 => Box::new(BitmapDistinctCountAccumulator::try_new()),
+            other => {
+                return Err(DataFusionError::NotImplemented(format!(
+                    "Support for 'bitmap_distinct' for data type {} is not implemented",
+                    other
+                )))
+            }
+        };
+        Ok(accumulator)
+    }
+
+    fn state_fields(&self) -> Result<Vec<Field>> {
+        Ok(vec![Field::new(
+            &format_state_name(&self.name, "bitmap_registers"),
+            DataType::Binary,
+            false,
+        )])
+    }
+
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
+    }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
+}
+
+#[derive(Debug)]
+struct BitmapDistinctCountAccumulator {
+    bitmap: croaring::bitmap::Bitmap,
+}
+
+impl BitmapDistinctCountAccumulator {
+    fn try_new() -> Self {
+        Self {
+            bitmap: croaring::bitmap::Bitmap::create(),
+        }
+    }
+}
+
+impl Accumulator for BitmapDistinctCountAccumulator {
+    //state() can be used by physical nodes to aggregate states together and send them over the network/threads, to combine values.
+    fn state(&self) -> Result<Vec<ScalarValue>> {
+        //maybe run optimized

Review comment:
       IMHO, when meeting large amount data, run optimized may extremely reduce shuffle data between thread or process(ballista), it may reduce IO cost.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Ted-Jiang edited a comment on pull request #1841: Implement bitmap_distinct function using croaring-rs bitmap

Posted by GitBox <gi...@apache.org>.
Ted-Jiang edited a comment on pull request #1841:
URL: https://github.com/apache/arrow-datafusion/pull/1841#issuecomment-1042631149


   > * I wonder if/how this gets things closer to being able to do distinct on compressed data (in DF's case on dictionary encoded columns). The problem (as I understand it) is that there is no guarantee that Arrow dictionaries have the same encoded representation for a value across batches, or even in the same record batch (if I remember how dictionary concatenation currently works in Arrow).
   
   `There is no guarantee that Arrow dictionaries have the same encoded representation for a value across batches` : yes
   For no-int col: We plan to maintain a global dictionary to encode col(string) into 32-bit int to accelerate count distinct.
   
   > * Would this work on 64-bit columns if they could first be casted to 32-bit? That is, assuming the contents of the 64-bit column actually fit as 32-bit unsigned integers?
   
   IMO, it will lose front 32 bit info, the result will be incorrect.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Ted-Jiang commented on pull request #1841: Implement bitmap_distinct function using croaring-rs bitmap

Posted by GitBox <gi...@apache.org>.
Ted-Jiang commented on pull request #1841:
URL: https://github.com/apache/arrow-datafusion/pull/1841#issuecomment-1042670184


   @alamb i found in CI
   ```
    --- stderr
     thread 'main' panicked at 'Unable to find libclang: "couldn't find any valid shared libraries matching: ['libclang.so', 'libclang-*.so', 'libclang.so.*', 'libclang-*.so.*'], set the `LIBCLANG_PATH` environment variable to a path where one of these files can be found (invalid: [])"', /github/home/.cargo/registry/src/github.com-1ecc6299db9ec823/bindgen-0.59.2/src/lib.rs:2144:31
   ```
   I think we should already depend on clang,
   Do you know how to add this env?
   IMO, like [croaring](https://github.com/saulius/croaring-rs/blob/master/.github/workflows/rust.yml)  only need install llvm in windows env


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on a change in pull request #1841: Implement bitmap_distinct function using croaring-rs bitmap

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #1841:
URL: https://github.com/apache/arrow-datafusion/pull/1841#discussion_r809028235



##########
File path: datafusion/Cargo.toml
##########
@@ -81,6 +81,7 @@ num-traits = { version = "0.2", optional = true }
 pyo3 = { version = "0.15", optional = true }
 tempfile = "3"
 parking_lot = "0.12"
+croaring = "0.5.1"

Review comment:
       I am somewhat worried about adding this dependency to datafusion (at least as a required dependency). Not only will it increase the requirements / compile time for datafusion, it also complicates the build process (as now one needs to install clang to build datafusion)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] jychen7 commented on a change in pull request #1841: Implement bitmap_distinct function using roaring bitmap

Posted by GitBox <gi...@apache.org>.
jychen7 commented on a change in pull request #1841:
URL: https://github.com/apache/arrow-datafusion/pull/1841#discussion_r835693924



##########
File path: datafusion-physical-expr/src/expressions/bitmap_distinct.rs
##########
@@ -0,0 +1,229 @@
+// Licensed to the Apache Software Foundation (ASF) under one

Review comment:
       [brainstorming] how about we combine this to `ApproxDistinct` and use `BitmapDistinctCountAccumulator` for int8, int16 and int32 if the feature is avilable?
   
   And use `NumericHLLAccumulator` for int64 and other non-int types. This way, user just need declare `approx_distinct` and rely on Datafusion to auto select the best approximate algorithm
   
   https://github.com/apache/arrow-datafusion/blob/81592947e8814327ebdbd1fbc3d4a090796e37a3/datafusion-physical-expr/src/expressions/approx_distinct.rs#L91-L98
   
   ---
   
   unrelate notes: as a user, I do want to keep `count(distinct)` as exact count and `approx_distinct` as approximation




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org