You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "pitrou (via GitHub)" <gi...@apache.org> on 2023/02/01 14:01:45 UTC

[GitHub] [arrow] pitrou commented on a diff in pull request #33776: GH-15164: [C++][Parquet] Implement current version of BloomFilter spec

pitrou commented on code in PR #33776:
URL: https://github.com/apache/arrow/pull/33776#discussion_r1093263708


##########
cpp/src/parquet/bloom_filter.cc:
##########
@@ -65,51 +69,109 @@ void BlockSplitBloomFilter::Init(const uint8_t* bitset, uint32_t num_bytes) {
   PARQUET_ASSIGN_OR_THROW(data_, ::arrow::AllocateBuffer(num_bytes_, pool_));
   memcpy(data_->mutable_data(), bitset, num_bytes_);
 
-  this->hasher_.reset(new MurmurHash3());
+  this->hasher_ = std::make_unique<XxHasher>();
 }
 
-BlockSplitBloomFilter BlockSplitBloomFilter::Deserialize(ArrowInputStream* input) {
-  uint32_t len, hash, algorithm;
-  int64_t bytes_available;
+static constexpr uint32_t kBloomFilterHeaderSizeGuess = 256;
 
-  PARQUET_ASSIGN_OR_THROW(bytes_available, input->Read(sizeof(uint32_t), &len));
-  if (static_cast<uint32_t>(bytes_available) != sizeof(uint32_t)) {
-    throw ParquetException("Failed to deserialize from input stream");
+static ::arrow::Status ValidateBloomFilterHeader(
+    const format::BloomFilterHeader& header) {
+  if (!header.algorithm.__isset.BLOCK) {
+    return ::arrow::Status::Invalid(
+        "Unsupported Bloom filter algorithm: ", header.algorithm, ".");
   }
 
-  PARQUET_ASSIGN_OR_THROW(bytes_available, input->Read(sizeof(uint32_t), &hash));
-  if (static_cast<uint32_t>(bytes_available) != sizeof(uint32_t)) {
-    throw ParquetException("Failed to deserialize from input stream");
-  }
-  if (static_cast<HashStrategy>(hash) != HashStrategy::MURMUR3_X64_128) {
-    throw ParquetException("Unsupported hash strategy");
+  if (!header.hash.__isset.XXHASH) {
+    return ::arrow::Status::Invalid("Unsupported Bloom filter hash: ", header.hash, ".");
   }
 
-  PARQUET_ASSIGN_OR_THROW(bytes_available, input->Read(sizeof(uint32_t), &algorithm));
-  if (static_cast<uint32_t>(bytes_available) != sizeof(uint32_t)) {
-    throw ParquetException("Failed to deserialize from input stream");
+  if (!header.compression.__isset.UNCOMPRESSED) {
+    return ::arrow::Status::Invalid(
+        "Unsupported Bloom filter compression: ", header.compression, ".");
   }
-  if (static_cast<Algorithm>(algorithm) != BloomFilter::Algorithm::BLOCK) {
-    throw ParquetException("Unsupported Bloom filter algorithm");
+
+  if (header.numBytes <= 0 ||
+      static_cast<uint32_t>(header.numBytes) > BloomFilter::kMaximumBloomFilterBytes) {
+    std::stringstream ss;
+    ss << "Bloom filter size is incorrect: " << header.numBytes << ". Must be in range ("
+       << 0 << ", " << BloomFilter::kMaximumBloomFilterBytes << "].";
+    return ::arrow::Status::Invalid(ss.str());
   }
 
-  BlockSplitBloomFilter bloom_filter;
+  return ::arrow::Status::OK();
+}
 
-  PARQUET_ASSIGN_OR_THROW(auto buffer, input->Read(len));
-  bloom_filter.Init(buffer->data(), len);
+BlockSplitBloomFilter BlockSplitBloomFilter::Deserialize(
+    const ReaderProperties& properties, ArrowInputStream* input) {
+  uint32_t header_size = 0;
+
+  ThriftDeserializer deserializer(properties);
+  format::BloomFilterHeader header;
+
+  // Read and deserialize bloom filter header
+  PARQUET_ASSIGN_OR_THROW(auto sv, input->Read(kBloomFilterHeaderSizeGuess));
+  // This gets used, then set by DeserializeThriftMsg
+  header_size = static_cast<uint32_t>(sv->size());
+  try {
+    deserializer.DeserializeMessage(reinterpret_cast<const uint8_t*>(sv->data()),
+                                    &header_size, &header);
+  } catch (std::exception& e) {
+    std::stringstream ss;
+    ss << "Deserializing bloom filter header failed.\n" << e.what();
+    throw ParquetException(ss.str());
+  }
+  // Throw if the header is invalid
+  PARQUET_THROW_NOT_OK(ValidateBloomFilterHeader(header));
+  int32_t bloom_filter_data_size = header.numBytes;
+  bool all_data_within_sv = bloom_filter_data_size + header_size <= sv->size();
+  if (all_data_within_sv) {
+    BlockSplitBloomFilter bloom_filter;
+    bloom_filter.Init(sv->data() + header_size, bloom_filter_data_size);
+    return bloom_filter;
+  }
+  DCHECK(sv->size() > header_size);

Review Comment:
   These could just be equal. So I would say `DCHECK_GE(sv->size(), header_size)`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org