You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@orc.apache.org by GitBox <gi...@apache.org> on 2021/03/02 13:48:27 UTC

[GitHub] [orc] wgtmac commented on a change in pull request #476: ORC-40: [C++] Implement Predicate Pushdown for C++ Reader

wgtmac commented on a change in pull request #476:
URL: https://github.com/apache/orc/pull/476#discussion_r585523244



##########
File path: c++/src/Reader.cc
##########
@@ -68,6 +68,12 @@ namespace orc {
       return columnPath.substr(0, columnPath.length() - 1);
   }
 
+  WriterVersion getWriterVersionImpl(const FileContents * contents) {
+    if (!contents->postscript->has_writerversion()) {
+      return WriterVersion_ORIGINAL;

Review comment:
       This was moved from Reader.cc:520. Changing it to WriterVersion_MAX breaks the c++ tool-test. I would keep it unchanged for now.

##########
File path: c++/include/orc/Reader.hh
##########
@@ -191,6 +192,11 @@ namespace orc {
      */
     RowReaderOptions& setEnableLazyDecoding(bool enable);
 
+    /**
+     * Set search argument for predicate push down
+     */
+    RowReaderOptions& setSearchArgument(std::unique_ptr<SearchArgument> sargs);

Review comment:
       Renamed setSearchArgument to searchArgument. I have checked getColumnNames() from the java code. It is just an internal reader option and not used any where. So I prefer not adding it at this moment.

##########
File path: c++/src/ColumnReader.cc
##########
@@ -1553,6 +1558,8 @@ namespace orc {
     ColumnReader::seekToRowGroup(positions);
     valueStream->seek(positions.at(columnId));
     scaleDecoder->seek(positions.at(columnId));
+    buffer = nullptr;
+    bufferEnd = nullptr;

Review comment:
       Done

##########
File path: c++/src/Reader.cc
##########
@@ -293,10 +308,19 @@ namespace orc {
     previousRow = rowNumber;
     startNextStripe();
 
+    // when predicate push down is enabled, above call to startNextStripe()
+    // will move current row to 1st matching row group; here we can simply return
+    if (sargsApplier) {

Review comment:
       Done

##########
File path: c++/src/Reader.cc
##########
@@ -376,6 +419,14 @@ namespace orc {
     return forcedScaleOnHive11Decimal;
   }
 
+  bool RowReaderImpl::getPPDStats(std::pair<uint64_t, uint64_t>& stats) const {
+    if (!sargsApplier) {
+      return false;
+    }
+    stats = sargsApplier->getStats();

Review comment:
       I will do that in a separate patch as this patch does not evaluate stripe stats yet.

##########
File path: c++/test/TestPredicatePushdown.cc
##########
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "orc/OrcFile.hh"
+#include "orc/sargs/SearchArgument.hh"
+#include "MemoryInputStream.hh"
+#include "MemoryOutputStream.hh"
+#include "wrap/gtest-wrapper.h"
+
+namespace orc {
+
+  static const int DEFAULT_MEM_STREAM_SIZE = 10 * 1024 * 1024; // 10M
+
+  TEST(TestPredicatePushdown, testPredicatePushdown) {
+    MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
+    MemoryPool * pool = getDefaultPool();
+    auto type = std::unique_ptr<Type>(Type::buildTypeFromString(
+      "struct<x:bigint,y:string>"));
+    WriterOptions options;
+    options.setStripeSize(1024 * 1024)
+      .setCompressionBlockSize(1024)
+      .setCompression(CompressionKind_NONE)
+      .setMemoryPool(pool)
+      .setRowIndexStride(1000);
+
+    auto writer = createWriter(*type, &memStream, options);
+    auto batch = writer->createRowBatch(3500);
+    auto& structBatch = dynamic_cast<StructVectorBatch&>(*batch);
+    auto& longBatch = dynamic_cast<LongVectorBatch&>(*structBatch.fields[0]);
+    auto& strBatch = dynamic_cast<StringVectorBatch&>(*structBatch.fields[1]);
+
+    char buffer[3500 * 5];
+    uint64_t offset = 0;
+    for (uint64_t i = 0; i < 3500; ++i) {
+      longBatch.data[i] = static_cast<int64_t>(i * 300);
+
+      std::ostringstream ss;
+      ss << 10 * i;
+      std::string str = ss.str();
+      memcpy(buffer + offset, str.c_str(), str.size());
+      strBatch.data[i] = buffer + offset;
+      strBatch.length[i] = static_cast<int64_t>(str.size());
+      offset += str.size();
+    }
+    structBatch.numElements = 3500;
+    longBatch.numElements = 3500;
+    strBatch.numElements = 3500;
+    writer->add(*batch);
+    writer->close();
+
+    std::unique_ptr<InputStream> inStream(new MemoryInputStream (
+      memStream.getData(), memStream.getLength()));
+    ReaderOptions readerOptions;
+    options.setMemoryPool(pool);
+    auto reader = createReader(std::move(inStream), readerOptions);
+    EXPECT_EQ(3500, reader->getNumberOfRows());
+
+    // build search argument (x >= 300000 AND x < 600000)

Review comment:
       Done

##########
File path: c++/src/sargs/SargsApplier.cc
##########
@@ -25,7 +26,13 @@ namespace orc {
                                     const std::string& colName) {
     for (uint64_t i = 0; i != type.getSubtypeCount(); ++i) {
       if (type.getFieldName(i) == colName) {
-        return type.getSubtype(i)->getColumnId();
+        if (type.getKind() == CHAR || type.getKind() == VARCHAR) {
+          ///FIXME: disable PPD for char and varchar types as their rules

Review comment:
       I don't have a clean solution here. So I have left a comment here to fix it in the future.

##########
File path: c++/src/Reader.cc
##########
@@ -376,6 +419,14 @@ namespace orc {
     return forcedScaleOnHive11Decimal;
   }
 
+  bool RowReaderImpl::getPPDStats(std::pair<uint64_t, uint64_t>& stats) const {
+    if (!sargsApplier) {

Review comment:
       Done

##########
File path: c++/test/TestPredicatePushdown.cc
##########
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "orc/OrcFile.hh"
+#include "orc/sargs/SearchArgument.hh"
+#include "MemoryInputStream.hh"
+#include "MemoryOutputStream.hh"
+#include "wrap/gtest-wrapper.h"
+
+namespace orc {
+
+  static const int DEFAULT_MEM_STREAM_SIZE = 10 * 1024 * 1024; // 10M
+
+  TEST(TestPredicatePushdown, testPredicatePushdown) {
+    MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
+    MemoryPool * pool = getDefaultPool();
+    auto type = std::unique_ptr<Type>(Type::buildTypeFromString(
+      "struct<x:bigint,y:string>"));
+    WriterOptions options;
+    options.setStripeSize(1024 * 1024)
+      .setCompressionBlockSize(1024)
+      .setCompression(CompressionKind_NONE)
+      .setMemoryPool(pool)
+      .setRowIndexStride(1000);
+
+    auto writer = createWriter(*type, &memStream, options);
+    auto batch = writer->createRowBatch(3500);
+    auto& structBatch = dynamic_cast<StructVectorBatch&>(*batch);
+    auto& longBatch = dynamic_cast<LongVectorBatch&>(*structBatch.fields[0]);
+    auto& strBatch = dynamic_cast<StringVectorBatch&>(*structBatch.fields[1]);
+
+    char buffer[3500 * 5];
+    uint64_t offset = 0;
+    for (uint64_t i = 0; i < 3500; ++i) {
+      longBatch.data[i] = static_cast<int64_t>(i * 300);
+
+      std::ostringstream ss;
+      ss << 10 * i;
+      std::string str = ss.str();
+      memcpy(buffer + offset, str.c_str(), str.size());
+      strBatch.data[i] = buffer + offset;
+      strBatch.length[i] = static_cast<int64_t>(str.size());
+      offset += str.size();
+    }
+    structBatch.numElements = 3500;
+    longBatch.numElements = 3500;
+    strBatch.numElements = 3500;
+    writer->add(*batch);
+    writer->close();
+
+    std::unique_ptr<InputStream> inStream(new MemoryInputStream (
+      memStream.getData(), memStream.getLength()));
+    ReaderOptions readerOptions;
+    options.setMemoryPool(pool);
+    auto reader = createReader(std::move(inStream), readerOptions);
+    EXPECT_EQ(3500, reader->getNumberOfRows());
+
+    // build search argument (x >= 300000 AND x < 600000)
+    {
+      std::unique_ptr<SearchArgument> sarg = SearchArgumentFactory::newBuilder()
+        ->startAnd()
+        .startNot()
+        .lessThan("x", PredicateDataType::LONG,
+                  Literal(static_cast<int64_t>(300000L)))
+        .end()
+        .lessThan("x", PredicateDataType::LONG,
+                  Literal(static_cast<int64_t>(600000L)))
+        .end()
+        .build();
+
+      RowReaderOptions rowReaderOpts;
+      rowReaderOpts.setSearchArgument(std::move(sarg));
+      auto rowReader = reader->createRowReader(rowReaderOpts);
+
+      auto readBatch = rowReader->createRowBatch(2000);
+      auto& batch0 = dynamic_cast<StructVectorBatch&>(*readBatch);
+      auto& batch1 = dynamic_cast<LongVectorBatch&>(*batch0.fields[0]);
+      auto& batch2 = dynamic_cast<StringVectorBatch&>(*batch0.fields[1]);
+
+      EXPECT_EQ(true, rowReader->next(*readBatch));
+      EXPECT_EQ(1000, readBatch->numElements);
+      EXPECT_EQ(1000, rowReader->getRowNumber());
+      for (uint64_t i = 1000; i < 2000; ++i) {
+        EXPECT_EQ(300 * i, batch1.data[i - 1000]);
+        EXPECT_EQ(std::to_string(10 * i),
+          std::string(batch2.data[i - 1000], static_cast<size_t>(batch2.length[i - 1000])));
+      }
+      EXPECT_EQ(false, rowReader->next(*readBatch));
+      EXPECT_EQ(3500, rowReader->getRowNumber());
+    }
+
+    // look through the file with no rows selected: x < 0
+    {
+      std::unique_ptr<SearchArgument> sarg = SearchArgumentFactory::newBuilder()
+        ->startAnd()
+        .lessThan("x", PredicateDataType::LONG,
+          Literal(static_cast<int64_t>(0)))
+        .end()
+        .build();
+
+      RowReaderOptions rowReaderOpts;
+      rowReaderOpts.setSearchArgument(std::move(sarg));
+      auto rowReader = reader->createRowReader(rowReaderOpts);
+
+      auto readBatch = rowReader->createRowBatch(2000);
+      EXPECT_EQ(false, rowReader->next(*readBatch));
+      EXPECT_EQ(3500, rowReader->getRowNumber());
+    }
+
+    // select first 100 and last 100 rows: x < 30000 OR x >= 1020000
+    {
+      std::unique_ptr<SearchArgument> sarg = SearchArgumentFactory::newBuilder()
+        ->startOr()
+        .lessThan("x", PredicateDataType::LONG,
+          Literal(static_cast<int64_t>(300 * 100)))
+        .startNot()
+        .lessThan("x", PredicateDataType::LONG,
+          Literal(static_cast<int64_t>(300 * 3400)))
+        .end()
+        .end()
+        .build();
+
+      RowReaderOptions rowReaderOpts;
+      rowReaderOpts.setSearchArgument(std::move(sarg));
+      auto rowReader = reader->createRowReader(rowReaderOpts);
+
+      auto readBatch = rowReader->createRowBatch(2000);
+      auto& batch0 = dynamic_cast<StructVectorBatch&>(*readBatch);
+      auto& batch1 = dynamic_cast<LongVectorBatch&>(*batch0.fields[0]);
+      auto& batch2 = dynamic_cast<StringVectorBatch&>(*batch0.fields[1]);
+
+      EXPECT_EQ(true, rowReader->next(*readBatch));
+      EXPECT_EQ(1000, readBatch->numElements);
+      EXPECT_EQ(0, rowReader->getRowNumber());
+      for (uint64_t i = 0; i < 1000; ++i) {
+        EXPECT_EQ(300 * i, batch1.data[i]);
+        EXPECT_EQ(std::to_string(10 * i),
+                  std::string(batch2.data[i], static_cast<size_t>(batch2.length[i])));
+      }
+
+      EXPECT_EQ(true, rowReader->next(*readBatch));
+      EXPECT_EQ(500, readBatch->numElements);
+      EXPECT_EQ(3000, rowReader->getRowNumber());
+      for (uint64_t i = 3000; i < 3500; ++i) {
+        EXPECT_EQ(300 * i, batch1.data[i - 3000]);
+        EXPECT_EQ(std::to_string(10 * i),
+                  std::string(batch2.data[i - 3000], static_cast<size_t>(batch2.length[i - 3000])));
+      }
+
+      EXPECT_EQ(false, rowReader->next(*readBatch));
+      EXPECT_EQ(3500, rowReader->getRowNumber());
+
+      // test seek to 3rd row group but is adjusted to 4th row group
+      rowReader->seekToRow(2500);

Review comment:
       Done

##########
File path: c++/test/TestPredicatePushdown.cc
##########
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "orc/OrcFile.hh"
+#include "orc/sargs/SearchArgument.hh"
+#include "MemoryInputStream.hh"
+#include "MemoryOutputStream.hh"
+#include "wrap/gtest-wrapper.h"
+
+namespace orc {
+
+  static const int DEFAULT_MEM_STREAM_SIZE = 10 * 1024 * 1024; // 10M
+
+  TEST(TestPredicatePushdown, testPredicatePushdown) {
+    MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
+    MemoryPool * pool = getDefaultPool();
+    auto type = std::unique_ptr<Type>(Type::buildTypeFromString(
+      "struct<x:bigint,y:string>"));
+    WriterOptions options;
+    options.setStripeSize(1024 * 1024)
+      .setCompressionBlockSize(1024)
+      .setCompression(CompressionKind_NONE)
+      .setMemoryPool(pool)
+      .setRowIndexStride(1000);
+
+    auto writer = createWriter(*type, &memStream, options);
+    auto batch = writer->createRowBatch(3500);
+    auto& structBatch = dynamic_cast<StructVectorBatch&>(*batch);
+    auto& longBatch = dynamic_cast<LongVectorBatch&>(*structBatch.fields[0]);
+    auto& strBatch = dynamic_cast<StringVectorBatch&>(*structBatch.fields[1]);
+
+    char buffer[3500 * 5];
+    uint64_t offset = 0;
+    for (uint64_t i = 0; i < 3500; ++i) {
+      longBatch.data[i] = static_cast<int64_t>(i * 300);
+
+      std::ostringstream ss;
+      ss << 10 * i;
+      std::string str = ss.str();
+      memcpy(buffer + offset, str.c_str(), str.size());
+      strBatch.data[i] = buffer + offset;
+      strBatch.length[i] = static_cast<int64_t>(str.size());
+      offset += str.size();
+    }
+    structBatch.numElements = 3500;
+    longBatch.numElements = 3500;
+    strBatch.numElements = 3500;
+    writer->add(*batch);
+    writer->close();
+
+    std::unique_ptr<InputStream> inStream(new MemoryInputStream (
+      memStream.getData(), memStream.getLength()));
+    ReaderOptions readerOptions;
+    options.setMemoryPool(pool);
+    auto reader = createReader(std::move(inStream), readerOptions);
+    EXPECT_EQ(3500, reader->getNumberOfRows());
+
+    // build search argument (x >= 300000 AND x < 600000)
+    {
+      std::unique_ptr<SearchArgument> sarg = SearchArgumentFactory::newBuilder()
+        ->startAnd()
+        .startNot()
+        .lessThan("x", PredicateDataType::LONG,
+                  Literal(static_cast<int64_t>(300000L)))
+        .end()
+        .lessThan("x", PredicateDataType::LONG,
+                  Literal(static_cast<int64_t>(600000L)))
+        .end()
+        .build();
+
+      RowReaderOptions rowReaderOpts;
+      rowReaderOpts.setSearchArgument(std::move(sarg));
+      auto rowReader = reader->createRowReader(rowReaderOpts);
+
+      auto readBatch = rowReader->createRowBatch(2000);
+      auto& batch0 = dynamic_cast<StructVectorBatch&>(*readBatch);
+      auto& batch1 = dynamic_cast<LongVectorBatch&>(*batch0.fields[0]);
+      auto& batch2 = dynamic_cast<StringVectorBatch&>(*batch0.fields[1]);
+
+      EXPECT_EQ(true, rowReader->next(*readBatch));
+      EXPECT_EQ(1000, readBatch->numElements);
+      EXPECT_EQ(1000, rowReader->getRowNumber());
+      for (uint64_t i = 1000; i < 2000; ++i) {
+        EXPECT_EQ(300 * i, batch1.data[i - 1000]);
+        EXPECT_EQ(std::to_string(10 * i),
+          std::string(batch2.data[i - 1000], static_cast<size_t>(batch2.length[i - 1000])));
+      }
+      EXPECT_EQ(false, rowReader->next(*readBatch));
+      EXPECT_EQ(3500, rowReader->getRowNumber());
+    }
+
+    // look through the file with no rows selected: x < 0
+    {
+      std::unique_ptr<SearchArgument> sarg = SearchArgumentFactory::newBuilder()
+        ->startAnd()
+        .lessThan("x", PredicateDataType::LONG,
+          Literal(static_cast<int64_t>(0)))
+        .end()
+        .build();
+
+      RowReaderOptions rowReaderOpts;
+      rowReaderOpts.setSearchArgument(std::move(sarg));
+      auto rowReader = reader->createRowReader(rowReaderOpts);
+
+      auto readBatch = rowReader->createRowBatch(2000);
+      EXPECT_EQ(false, rowReader->next(*readBatch));
+      EXPECT_EQ(3500, rowReader->getRowNumber());
+    }
+
+    // select first 100 and last 100 rows: x < 30000 OR x >= 1020000
+    {
+      std::unique_ptr<SearchArgument> sarg = SearchArgumentFactory::newBuilder()
+        ->startOr()
+        .lessThan("x", PredicateDataType::LONG,
+          Literal(static_cast<int64_t>(300 * 100)))
+        .startNot()
+        .lessThan("x", PredicateDataType::LONG,
+          Literal(static_cast<int64_t>(300 * 3400)))
+        .end()
+        .end()
+        .build();
+
+      RowReaderOptions rowReaderOpts;
+      rowReaderOpts.setSearchArgument(std::move(sarg));
+      auto rowReader = reader->createRowReader(rowReaderOpts);
+
+      auto readBatch = rowReader->createRowBatch(2000);
+      auto& batch0 = dynamic_cast<StructVectorBatch&>(*readBatch);
+      auto& batch1 = dynamic_cast<LongVectorBatch&>(*batch0.fields[0]);
+      auto& batch2 = dynamic_cast<StringVectorBatch&>(*batch0.fields[1]);
+
+      EXPECT_EQ(true, rowReader->next(*readBatch));
+      EXPECT_EQ(1000, readBatch->numElements);

Review comment:
       The above comment for pred is slightly wrong. But still 2 row groups are selected.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org