You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by ji...@apache.org on 2023/06/14 03:09:41 UTC

[doris-thirdparty] branch clucene updated: [improvement]optimize reduce index write time (#87)

This is an automated email from the ASF dual-hosted git repository.

jianliangqi pushed a commit to branch clucene
in repository https://gitbox.apache.org/repos/asf/doris-thirdparty.git


The following commit(s) were added to refs/heads/clucene by this push:
     new 0f33e06d [improvement]optimize reduce index write time (#87)
0f33e06d is described below

commit 0f33e06d5c18a073b5858179a0a7d4daebee01b8
Author: zzzxl <33...@users.noreply.github.com>
AuthorDate: Wed Jun 14 11:09:33 2023 +0800

    [improvement]optimize reduce index write time (#87)
---
 src/core/CLucene/analysis/AnalysisHeader.cpp |  7 ---
 src/core/CLucene/analysis/AnalysisHeader.h   | 33 +++++++++-
 src/core/CLucene/analysis/Analyzers.cpp      |  2 +-
 src/core/CLucene/analysis/Analyzers.h        | 23 ++++---
 src/core/CLucene/index/SDocumentWriter.cpp   | 50 ++++++++++-----
 src/core/CLucene/index/SDocumentWriter.h     |  3 +-
 src/core/CLucene/store/IndexOutput.h         |  2 +-
 src/core/CLucene/util/stringUtil.h           | 94 ++++++++++++++++++++++++++++
 8 files changed, 178 insertions(+), 36 deletions(-)

diff --git a/src/core/CLucene/analysis/AnalysisHeader.cpp b/src/core/CLucene/analysis/AnalysisHeader.cpp
index a7fd819e..f90085c0 100644
--- a/src/core/CLucene/analysis/AnalysisHeader.cpp
+++ b/src/core/CLucene/analysis/AnalysisHeader.cpp
@@ -43,13 +43,6 @@ size_t Token::termLength<TCHAR>(){
     return _termTextLen;
 };
 
-template <>
-size_t Token::termLength<char>(){
-    if ( _termTextLen == -1 ) //it was invalidated by growBuffer
-        _termTextLen = strlen((char*)_buffer);
-    return _termTextLen;
-};
-
 ///Compares the Token for their order
 class OrderCompare:LUCENE_BASE, public CL_NS(util)::Compare::_base //<Token*>
 {
diff --git a/src/core/CLucene/analysis/AnalysisHeader.h b/src/core/CLucene/analysis/AnalysisHeader.h
index cbe0453d..07beb9e4 100644
--- a/src/core/CLucene/analysis/AnalysisHeader.h
+++ b/src/core/CLucene/analysis/AnalysisHeader.h
@@ -68,6 +68,7 @@ private:
     size_t bufferTextLen;
     void *_buffer;       ///< the text of the term
     int32_t _termTextLen;///< the length of termText. Internal use only
+    bool isNoCopy = false;
 
     CL_NS(index)::Payload *payload;
 
@@ -84,7 +85,10 @@ public:
         bufferTextLen = 0;
     };
     virtual ~Token() {
-        free(_buffer);
+        if (!isNoCopy) {
+            free(_buffer);
+            _buffer = nullptr;
+        }
         _CLLDELETE(payload);
     };
 
@@ -99,6 +103,15 @@ public:
         setText(text, end - start);
     };
 
+    template<typename T>
+    void setNoCopy(const T *text, const int32_t start, const int32_t end, const TCHAR *typ = NULL) {
+        _startOffset = start;
+        _endOffset = end;
+        _type = (typ == NULL ? getDefaultType() : typ);
+        positionIncrement = 1;
+        setTextNoCopy(text, end - start);
+    };
+
     size_t bufferLength() const {
         return bufferTextLen;
     }
@@ -132,12 +145,12 @@ public:
     int32_t getPositionIncrement() const { return positionIncrement; }
 
     template<typename T>
-    T *termBuffer() const {
+    inline T *termBuffer() const {
         return (T *) _buffer;
     }
 
     template<typename T>
-    size_t termLength();//< Length of the the termBuffer. See #termBuffer
+    inline size_t termLength();//< Length of the the termBuffer. See #termBuffer
 
     void resetTermTextLen() {
         _termTextLen = -1;
@@ -152,6 +165,13 @@ public:
         ((T *) _buffer)[_termTextLen] = 0;//make sure null terminated
     };
 
+    template<typename T>
+    void setTextNoCopy(const T *text, int32_t l) {
+        _termTextLen = l;
+        _buffer = (void*)text;
+        isNoCopy = true;
+    };
+
     int32_t startOffset() const {
         return _startOffset;
     }
@@ -196,6 +216,13 @@ public:
     }
 };
 
+template <>
+inline size_t Token::termLength<char>(){
+    if ( _termTextLen == -1 ) //it was invalidated by growBuffer
+        _termTextLen = strlen((char*)_buffer);
+    return _termTextLen;
+};
+
 class CLUCENE_EXPORT TokenStream {
 public:
     /** Returns the next token in the stream, or null at EOS.
diff --git a/src/core/CLucene/analysis/Analyzers.cpp b/src/core/CLucene/analysis/Analyzers.cpp
index ebaa82b6..3ea3e8b0 100644
--- a/src/core/CLucene/analysis/Analyzers.cpp
+++ b/src/core/CLucene/analysis/Analyzers.cpp
@@ -94,7 +94,7 @@ Token *SimpleTokenizer<char>::next(Token *token) {
             break;            // return 'em
     }
     buffer[length] = 0;
-    token->set(buffer, start, start + length);
+    token->setNoCopy(buffer, start, start + length);
 
     return token;
 };
diff --git a/src/core/CLucene/analysis/Analyzers.h b/src/core/CLucene/analysis/Analyzers.h
index f5da43c6..17f88cff 100644
--- a/src/core/CLucene/analysis/Analyzers.h
+++ b/src/core/CLucene/analysis/Analyzers.h
@@ -184,16 +184,23 @@ public:
         return _CLNEW SimpleTokenizer<T>(reader);
     }
     TokenStream* reusableTokenStream(const TCHAR* fieldName, CL_NS(util)::Reader* reader) override{
-        auto* tokenizer = static_cast<Tokenizer*>(getPreviousTokenStream());
-        if (tokenizer == nullptr) {
-            tokenizer = _CLNEW SimpleTokenizer<T>(reader);
-            setPreviousTokenStream(tokenizer);
-        } else
-            tokenizer->reset(reader);
-        return tokenizer;
+        if (tokenizer_ == nullptr) {
+            tokenizer_ = new SimpleTokenizer<T>(reader);
+        } else {
+            tokenizer_->reset(reader);
+        }
+        return tokenizer_;
     };
 
-    virtual ~SimpleAnalyzer(){}
+    virtual ~SimpleAnalyzer() {
+        if (tokenizer_) {
+            delete tokenizer_;
+            tokenizer_ = nullptr;
+        }
+    }
+
+private:
+    SimpleTokenizer<T>* tokenizer_ = nullptr;
 };
 
 /**
diff --git a/src/core/CLucene/index/SDocumentWriter.cpp b/src/core/CLucene/index/SDocumentWriter.cpp
index 6c06519c..3b22fdad 100644
--- a/src/core/CLucene/index/SDocumentWriter.cpp
+++ b/src/core/CLucene/index/SDocumentWriter.cpp
@@ -140,15 +140,18 @@ typename SDocumentsWriter<T>::ThreadState *SDocumentsWriter<T>::getThreadState(D
         threadState = _CLNEW ThreadState(this);
     }
 
-    ThreadState *state = threadState;
-    if (segment.empty())
+    if (segment.empty()) {
         segment = writer->newSegmentName();
-    state->init(doc, nextDocID);
+        threadState->init(doc, nextDocID);
+    }
+
+    threadState->docID = nextDocID;
+
     // Only increment nextDocID & numDocsInRAM on successful init
     nextDocID++;
     numDocsInRAM++;
 
-    return state;
+    return threadState;
 }
 
 template<typename T>
@@ -244,6 +247,7 @@ void SDocumentsWriter<T>::ThreadState::init(Document *doc, int32_t doc_id) {
 
         fp->docFields.values[fp->fieldCount++] = field;
     }
+    _parent->hasProx_ = _parent->fieldInfos->hasProx();
 }
 
 template<typename T>
@@ -313,7 +317,7 @@ void SDocumentsWriter<T>::ThreadState::FieldData::processField(Analyzer *sanalyz
                 threadState->numStoredFields++;
             }
 
-            docFieldsFinal.values[j] = NULL;
+            // docFieldsFinal.values[j] = NULL;
         }
     } catch (exception &ae) {
         throw ae;
@@ -440,10 +444,27 @@ void SDocumentsWriter<T>::ThreadState::writeProxBytes(const uint8_t *b, int32_t
     }
 }
 
+template <typename T>
+inline bool eq(const std::basic_string_view<T>& a, const std::basic_string_view<T>& b) {
+    if constexpr (std::is_same_v<T, char>) {
+#if defined(__SSE2__)
+        if (a.size() != b.size()) {
+            return false;
+        }
+        return StringUtil::memequalSSE2Wide(a.data(), b.data(), a.size());
+#endif
+    }
+    return a == b;
+}
+
 template<typename T>
 void SDocumentsWriter<T>::ThreadState::FieldData::addPosition(Token *token) {
     const T *tokenText = token->termBuffer<T>();
     const int32_t tokenTextLen = token->termLength<T>();
+    std::basic_string_view<T> term(tokenText, tokenTextLen);
+    // if constexpr (std::is_same_v<T, char>) {
+    //     std::cout << term << std::endl;
+    // }
     uint32_t code = 0;
 
     // Compute hashcode
@@ -459,16 +480,14 @@ void SDocumentsWriter<T>::ThreadState::FieldData::addPosition(Token *token) {
     // Locate Posting in hash
     threadState->p = postingsHash[hashPos];
 
-    if (threadState->p != nullptr && !threadState->postingEquals(tokenText, tokenTextLen)) {
-        // Conflict: keep searching different locations in
-        // the hash table.
+    if (threadState->p != nullptr && !eq(threadState->p->term_, term)) {
         const uint32_t inc = ((code >> 8) + code) | 1;
         do {
             postingsHashConflicts++;
             code += inc;
             hashPos = code & postingsHashMask;
             threadState->p = postingsHash[hashPos];
-        } while (threadState->p != nullptr && !threadState->postingEquals(tokenText, tokenTextLen));
+        } while (threadState->p != nullptr && !eq(threadState->p->term_, term));
     }
 
     int32_t proxCode = 0;
@@ -530,6 +549,7 @@ void SDocumentsWriter<T>::ThreadState::FieldData::addPosition(Token *token) {
             threadState->scharPool->tUpto += textLen1;
 
             memcpy(textUpto, tokenText, tokenTextLen * sizeof(T));
+            threadState->p->term_ = std::basic_string_view<T>(textUpto, term.size());
             textUpto[tokenTextLen] = CLUCENE_END_OF_WORD;
 
             assert(postingsHash[hashPos] == NULL);
@@ -947,7 +967,7 @@ void SDocumentsWriter<T>::writeSegment(std::vector<std::string> &flushedFiles) {
     IndexOutput *freqOut = directory->createOutput((segmentName + ".frq").c_str());
     // TODO:add options in field index
     IndexOutput *proxOut = nullptr;
-    if (fieldInfos->hasProx()) {
+    if (hasProx_) {
         proxOut = directory->createOutput((segmentName + ".prx").c_str());
     }
 
@@ -1007,7 +1027,7 @@ void SDocumentsWriter<T>::writeSegment(std::vector<std::string> &flushedFiles) {
     // Record all files we have flushed
     flushedFiles.push_back(segmentFileName(IndexFileNames::FIELD_INFOS_EXTENSION));
     flushedFiles.push_back(segmentFileName(IndexFileNames::FREQ_EXTENSION));
-    if (fieldInfos->hasProx()) {
+    if (hasProx_) {
         flushedFiles.push_back(segmentFileName(IndexFileNames::PROX_EXTENSION));
     }
     flushedFiles.push_back(segmentFileName(IndexFileNames::TERMS_EXTENSION));
@@ -1129,7 +1149,7 @@ void SDocumentsWriter<T>::appendPostings(ArrayBase<typename ThreadState::FieldDa
 
         int64_t freqPointer = freqOut->getFilePointer();
         int64_t proxPointer = 0;
-        if (fieldInfos->hasProx()) {
+        if (hasProx_) {
             proxPointer = proxOut->getFilePointer();
         }
 
@@ -1154,7 +1174,7 @@ void SDocumentsWriter<T>::appendPostings(ArrayBase<typename ThreadState::FieldDa
         while (numToMerge > 0) {
 
             if ((++df % skipInterval) == 0) {
-                if (fieldInfos->hasProx()) {
+                if (hasProx_) {
                     freqOut->writeByte((char)CodeMode::kPfor);
                     freqOut->writeVInt(docDeltaBuffer.size());
                     // doc
@@ -1187,7 +1207,7 @@ void SDocumentsWriter<T>::appendPostings(ArrayBase<typename ThreadState::FieldDa
             // changing the format to match Lucene's segment
             // format.
 
-            if (fieldInfos->hasProx()) {
+            if (hasProx_) {
                 // position
                 for (int32_t j = 0; j < termDocFreq; j++) {
                     const int32_t code = prox.readVInt();
@@ -1235,7 +1255,7 @@ void SDocumentsWriter<T>::appendPostings(ArrayBase<typename ThreadState::FieldDa
 
         // Done merging this term
         {
-            if (fieldInfos->hasProx()) {
+            if (hasProx_) {
                 freqOut->writeByte((char)CodeMode::kDefault);
                 freqOut->writeVInt(docDeltaBuffer.size());
                 uint32_t lastDoc = 0;
diff --git a/src/core/CLucene/index/SDocumentWriter.h b/src/core/CLucene/index/SDocumentWriter.h
index f1da76fa..5163c2a1 100644
--- a/src/core/CLucene/index/SDocumentWriter.h
+++ b/src/core/CLucene/index/SDocumentWriter.h
@@ -53,6 +53,7 @@ private:
     std::vector<uint32_t> freqBuffer;
     std::ostream* infoStream{};
     int64_t ramBufferSize;
+    bool hasProx_ = false;
 
 public:
     class FieldMergeState;
@@ -68,6 +69,7 @@ public:
         int32_t lastDocID;   // Last docID where this term occurred
         int32_t lastDocCode; // Code for prior doc
         int32_t lastPosition;// Last position where this term occurred
+        std::basic_string_view<T> term_;
     };
 
     /* Stores norms, buffered in RAM, until they are flushed
@@ -672,7 +674,6 @@ public:
         this->infoStream = nullptr;
         fieldInfos = _CLNEW FieldInfos();
 
-
         this->closed = this->flushPending = false;
         postingsFreeCountDW = postingsAllocCountDW = 0;
         docStoreOffset = nextDocID = numDocsInRAM = numDocsInStore = nextWriteDocID = 0;
diff --git a/src/core/CLucene/store/IndexOutput.h b/src/core/CLucene/store/IndexOutput.h
index d6887f58..6b6ca321 100644
--- a/src/core/CLucene/store/IndexOutput.h
+++ b/src/core/CLucene/store/IndexOutput.h
@@ -117,7 +117,7 @@ public:
 /** Base implementation class for buffered {@link IndexOutput}. */
 class CLUCENE_EXPORT BufferedIndexOutput : public IndexOutput{
 public:
-	LUCENE_STATIC_CONSTANT(int32_t, BUFFER_SIZE=16384);
+	LUCENE_STATIC_CONSTANT(int32_t, BUFFER_SIZE=65536);
 private:
 	uint8_t* buffer;
 	int64_t bufferStart;			  // position in file of buffer
diff --git a/src/core/CLucene/util/stringUtil.h b/src/core/CLucene/util/stringUtil.h
index 01bbb7fa..da0547d6 100644
--- a/src/core/CLucene/util/stringUtil.h
+++ b/src/core/CLucene/util/stringUtil.h
@@ -111,4 +111,98 @@ public:
     return (char)LUT[c];
 }
 
+class StringUtil {
+public:
+    template <typename T>
+    static inline T unaligned_load(const void* address) {
+        T res {};
+        memcpy(&res, address, sizeof(res));
+        return res;
+    }
+
+#if defined(__SSE2__)
+
+    static inline bool compareSSE2(const char* p1, const char* p2) {
+        return 0xFFFF == _mm_movemask_epi8(_mm_cmpeq_epi8(
+                                 _mm_loadu_si128(reinterpret_cast<const __m128i*>(p1)),
+                                 _mm_loadu_si128(reinterpret_cast<const __m128i*>(p2))));
+    }
+
+    static inline bool compareSSE2x4(const char* p1, const char* p2) {
+        return 0xFFFF ==
+               _mm_movemask_epi8(_mm_and_si128(
+                       _mm_and_si128(
+                               _mm_cmpeq_epi8(
+                                       _mm_loadu_si128(reinterpret_cast<const __m128i*>(p1)),
+                                       _mm_loadu_si128(reinterpret_cast<const __m128i*>(p2))),
+                               _mm_cmpeq_epi8(
+                                       _mm_loadu_si128(reinterpret_cast<const __m128i*>(p1) + 1),
+                                       _mm_loadu_si128(reinterpret_cast<const __m128i*>(p2) + 1))),
+                       _mm_and_si128(
+                               _mm_cmpeq_epi8(
+                                       _mm_loadu_si128(reinterpret_cast<const __m128i*>(p1) + 2),
+                                       _mm_loadu_si128(reinterpret_cast<const __m128i*>(p2) + 2)),
+                               _mm_cmpeq_epi8(
+                                       _mm_loadu_si128(reinterpret_cast<const __m128i*>(p1) + 3),
+                                       _mm_loadu_si128(reinterpret_cast<const __m128i*>(p2) +
+                                                       3)))));
+    }
+
+    static inline bool memequalSSE2Wide(const char* p1, const char* p2, size_t size) {
+        if (size <= 16) {
+            if (size >= 8) {
+                /// Chunks of [8,16] bytes.
+                return unaligned_load<uint64_t>(p1) == unaligned_load<uint64_t>(p2) &&
+                       unaligned_load<uint64_t>(p1 + size - 8) ==
+                               unaligned_load<uint64_t>(p2 + size - 8);
+            } else if (size >= 4) {
+                /// Chunks of [4,7] bytes.
+                return unaligned_load<uint32_t>(p1) == unaligned_load<uint32_t>(p2) &&
+                       unaligned_load<uint32_t>(p1 + size - 4) ==
+                               unaligned_load<uint32_t>(p2 + size - 4);
+            } else if (size >= 2) {
+                /// Chunks of [2,3] bytes.
+                return unaligned_load<uint16_t>(p1) == unaligned_load<uint16_t>(p2) &&
+                       unaligned_load<uint16_t>(p1 + size - 2) ==
+                               unaligned_load<uint16_t>(p2 + size - 2);
+            } else if (size >= 1) {
+                /// A single byte.
+                return *p1 == *p2;
+            }
+            return true;
+        }
+
+        while (size >= 64) {
+            if (compareSSE2x4(p1, p2)) {
+                p1 += 64;
+                p2 += 64;
+                size -= 64;
+            } else {
+                return false;
+            }
+        }
+
+        switch (size / 16) {
+        case 3:
+            if (!compareSSE2(p1 + 32, p2 + 32)) {
+                return false;
+            }
+            [[fallthrough]];
+        case 2:
+            if (!compareSSE2(p1 + 16, p2 + 16)) {
+                return false;
+            }
+            [[fallthrough]];
+        case 1:
+            if (!compareSSE2(p1, p2)) {
+                return false;
+            }
+        }
+
+        return compareSSE2(p1 + size - 16, p2 + size - 16);
+    }
+
+#endif
+};
+
 #endif//_lucene_util__stringutil_H


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org