You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by ji...@apache.org on 2023/06/14 03:09:41 UTC
[doris-thirdparty] branch clucene updated: [improvement]optimize reduce index write time (#87)
This is an automated email from the ASF dual-hosted git repository.
jianliangqi pushed a commit to branch clucene
in repository https://gitbox.apache.org/repos/asf/doris-thirdparty.git
The following commit(s) were added to refs/heads/clucene by this push:
new 0f33e06d [improvement]optimize reduce index write time (#87)
0f33e06d is described below
commit 0f33e06d5c18a073b5858179a0a7d4daebee01b8
Author: zzzxl <33...@users.noreply.github.com>
AuthorDate: Wed Jun 14 11:09:33 2023 +0800
[improvement]optimize reduce index write time (#87)
---
src/core/CLucene/analysis/AnalysisHeader.cpp | 7 ---
src/core/CLucene/analysis/AnalysisHeader.h | 33 +++++++++-
src/core/CLucene/analysis/Analyzers.cpp | 2 +-
src/core/CLucene/analysis/Analyzers.h | 23 ++++---
src/core/CLucene/index/SDocumentWriter.cpp | 50 ++++++++++-----
src/core/CLucene/index/SDocumentWriter.h | 3 +-
src/core/CLucene/store/IndexOutput.h | 2 +-
src/core/CLucene/util/stringUtil.h | 94 ++++++++++++++++++++++++++++
8 files changed, 178 insertions(+), 36 deletions(-)
diff --git a/src/core/CLucene/analysis/AnalysisHeader.cpp b/src/core/CLucene/analysis/AnalysisHeader.cpp
index a7fd819e..f90085c0 100644
--- a/src/core/CLucene/analysis/AnalysisHeader.cpp
+++ b/src/core/CLucene/analysis/AnalysisHeader.cpp
@@ -43,13 +43,6 @@ size_t Token::termLength<TCHAR>(){
return _termTextLen;
};
-template <>
-size_t Token::termLength<char>(){
- if ( _termTextLen == -1 ) //it was invalidated by growBuffer
- _termTextLen = strlen((char*)_buffer);
- return _termTextLen;
-};
-
///Compares the Token for their order
class OrderCompare:LUCENE_BASE, public CL_NS(util)::Compare::_base //<Token*>
{
diff --git a/src/core/CLucene/analysis/AnalysisHeader.h b/src/core/CLucene/analysis/AnalysisHeader.h
index cbe0453d..07beb9e4 100644
--- a/src/core/CLucene/analysis/AnalysisHeader.h
+++ b/src/core/CLucene/analysis/AnalysisHeader.h
@@ -68,6 +68,7 @@ private:
size_t bufferTextLen;
void *_buffer; ///< the text of the term
int32_t _termTextLen;///< the length of termText. Internal use only
+ bool isNoCopy = false;
CL_NS(index)::Payload *payload;
@@ -84,7 +85,10 @@ public:
bufferTextLen = 0;
};
virtual ~Token() {
- free(_buffer);
+ if (!isNoCopy) {
+ free(_buffer);
+ _buffer = nullptr;
+ }
_CLLDELETE(payload);
};
@@ -99,6 +103,15 @@ public:
setText(text, end - start);
};
+ template<typename T>
+ void setNoCopy(const T *text, const int32_t start, const int32_t end, const TCHAR *typ = NULL) {
+ _startOffset = start;
+ _endOffset = end;
+ _type = (typ == NULL ? getDefaultType() : typ);
+ positionIncrement = 1;
+ setTextNoCopy(text, end - start);
+ };
+
size_t bufferLength() const {
return bufferTextLen;
}
@@ -132,12 +145,12 @@ public:
int32_t getPositionIncrement() const { return positionIncrement; }
template<typename T>
- T *termBuffer() const {
+ inline T *termBuffer() const {
return (T *) _buffer;
}
template<typename T>
- size_t termLength();//< Length of the the termBuffer. See #termBuffer
+ inline size_t termLength();//< Length of the the termBuffer. See #termBuffer
void resetTermTextLen() {
_termTextLen = -1;
@@ -152,6 +165,13 @@ public:
((T *) _buffer)[_termTextLen] = 0;//make sure null terminated
};
+ template<typename T>
+ void setTextNoCopy(const T *text, int32_t l) {
+ _termTextLen = l;
+ _buffer = (void*)text;
+ isNoCopy = true;
+ };
+
int32_t startOffset() const {
return _startOffset;
}
@@ -196,6 +216,13 @@ public:
}
};
+template <>
+inline size_t Token::termLength<char>(){
+ if ( _termTextLen == -1 ) //it was invalidated by growBuffer
+ _termTextLen = strlen((char*)_buffer);
+ return _termTextLen;
+};
+
class CLUCENE_EXPORT TokenStream {
public:
/** Returns the next token in the stream, or null at EOS.
diff --git a/src/core/CLucene/analysis/Analyzers.cpp b/src/core/CLucene/analysis/Analyzers.cpp
index ebaa82b6..3ea3e8b0 100644
--- a/src/core/CLucene/analysis/Analyzers.cpp
+++ b/src/core/CLucene/analysis/Analyzers.cpp
@@ -94,7 +94,7 @@ Token *SimpleTokenizer<char>::next(Token *token) {
break; // return 'em
}
buffer[length] = 0;
- token->set(buffer, start, start + length);
+ token->setNoCopy(buffer, start, start + length);
return token;
};
diff --git a/src/core/CLucene/analysis/Analyzers.h b/src/core/CLucene/analysis/Analyzers.h
index f5da43c6..17f88cff 100644
--- a/src/core/CLucene/analysis/Analyzers.h
+++ b/src/core/CLucene/analysis/Analyzers.h
@@ -184,16 +184,23 @@ public:
return _CLNEW SimpleTokenizer<T>(reader);
}
TokenStream* reusableTokenStream(const TCHAR* fieldName, CL_NS(util)::Reader* reader) override{
- auto* tokenizer = static_cast<Tokenizer*>(getPreviousTokenStream());
- if (tokenizer == nullptr) {
- tokenizer = _CLNEW SimpleTokenizer<T>(reader);
- setPreviousTokenStream(tokenizer);
- } else
- tokenizer->reset(reader);
- return tokenizer;
+ if (tokenizer_ == nullptr) {
+ tokenizer_ = new SimpleTokenizer<T>(reader);
+ } else {
+ tokenizer_->reset(reader);
+ }
+ return tokenizer_;
};
- virtual ~SimpleAnalyzer(){}
+ virtual ~SimpleAnalyzer() {
+ if (tokenizer_) {
+ delete tokenizer_;
+ tokenizer_ = nullptr;
+ }
+ }
+
+private:
+ SimpleTokenizer<T>* tokenizer_ = nullptr;
};
/**
diff --git a/src/core/CLucene/index/SDocumentWriter.cpp b/src/core/CLucene/index/SDocumentWriter.cpp
index 6c06519c..3b22fdad 100644
--- a/src/core/CLucene/index/SDocumentWriter.cpp
+++ b/src/core/CLucene/index/SDocumentWriter.cpp
@@ -140,15 +140,18 @@ typename SDocumentsWriter<T>::ThreadState *SDocumentsWriter<T>::getThreadState(D
threadState = _CLNEW ThreadState(this);
}
- ThreadState *state = threadState;
- if (segment.empty())
+ if (segment.empty()) {
segment = writer->newSegmentName();
- state->init(doc, nextDocID);
+ threadState->init(doc, nextDocID);
+ }
+
+ threadState->docID = nextDocID;
+
// Only increment nextDocID & numDocsInRAM on successful init
nextDocID++;
numDocsInRAM++;
- return state;
+ return threadState;
}
template<typename T>
@@ -244,6 +247,7 @@ void SDocumentsWriter<T>::ThreadState::init(Document *doc, int32_t doc_id) {
fp->docFields.values[fp->fieldCount++] = field;
}
+ _parent->hasProx_ = _parent->fieldInfos->hasProx();
}
template<typename T>
@@ -313,7 +317,7 @@ void SDocumentsWriter<T>::ThreadState::FieldData::processField(Analyzer *sanalyz
threadState->numStoredFields++;
}
- docFieldsFinal.values[j] = NULL;
+ // docFieldsFinal.values[j] = NULL;
}
} catch (exception &ae) {
throw ae;
@@ -440,10 +444,27 @@ void SDocumentsWriter<T>::ThreadState::writeProxBytes(const uint8_t *b, int32_t
}
}
+template <typename T>
+inline bool eq(const std::basic_string_view<T>& a, const std::basic_string_view<T>& b) {
+ if constexpr (std::is_same_v<T, char>) {
+#if defined(__SSE2__)
+ if (a.size() != b.size()) {
+ return false;
+ }
+ return StringUtil::memequalSSE2Wide(a.data(), b.data(), a.size());
+#endif
+ }
+ return a == b;
+}
+
template<typename T>
void SDocumentsWriter<T>::ThreadState::FieldData::addPosition(Token *token) {
const T *tokenText = token->termBuffer<T>();
const int32_t tokenTextLen = token->termLength<T>();
+ std::basic_string_view<T> term(tokenText, tokenTextLen);
+ // if constexpr (std::is_same_v<T, char>) {
+ // std::cout << term << std::endl;
+ // }
uint32_t code = 0;
// Compute hashcode
@@ -459,16 +480,14 @@ void SDocumentsWriter<T>::ThreadState::FieldData::addPosition(Token *token) {
// Locate Posting in hash
threadState->p = postingsHash[hashPos];
- if (threadState->p != nullptr && !threadState->postingEquals(tokenText, tokenTextLen)) {
- // Conflict: keep searching different locations in
- // the hash table.
+ if (threadState->p != nullptr && !eq(threadState->p->term_, term)) {
const uint32_t inc = ((code >> 8) + code) | 1;
do {
postingsHashConflicts++;
code += inc;
hashPos = code & postingsHashMask;
threadState->p = postingsHash[hashPos];
- } while (threadState->p != nullptr && !threadState->postingEquals(tokenText, tokenTextLen));
+ } while (threadState->p != nullptr && !eq(threadState->p->term_, term));
}
int32_t proxCode = 0;
@@ -530,6 +549,7 @@ void SDocumentsWriter<T>::ThreadState::FieldData::addPosition(Token *token) {
threadState->scharPool->tUpto += textLen1;
memcpy(textUpto, tokenText, tokenTextLen * sizeof(T));
+ threadState->p->term_ = std::basic_string_view<T>(textUpto, term.size());
textUpto[tokenTextLen] = CLUCENE_END_OF_WORD;
assert(postingsHash[hashPos] == NULL);
@@ -947,7 +967,7 @@ void SDocumentsWriter<T>::writeSegment(std::vector<std::string> &flushedFiles) {
IndexOutput *freqOut = directory->createOutput((segmentName + ".frq").c_str());
// TODO:add options in field index
IndexOutput *proxOut = nullptr;
- if (fieldInfos->hasProx()) {
+ if (hasProx_) {
proxOut = directory->createOutput((segmentName + ".prx").c_str());
}
@@ -1007,7 +1027,7 @@ void SDocumentsWriter<T>::writeSegment(std::vector<std::string> &flushedFiles) {
// Record all files we have flushed
flushedFiles.push_back(segmentFileName(IndexFileNames::FIELD_INFOS_EXTENSION));
flushedFiles.push_back(segmentFileName(IndexFileNames::FREQ_EXTENSION));
- if (fieldInfos->hasProx()) {
+ if (hasProx_) {
flushedFiles.push_back(segmentFileName(IndexFileNames::PROX_EXTENSION));
}
flushedFiles.push_back(segmentFileName(IndexFileNames::TERMS_EXTENSION));
@@ -1129,7 +1149,7 @@ void SDocumentsWriter<T>::appendPostings(ArrayBase<typename ThreadState::FieldDa
int64_t freqPointer = freqOut->getFilePointer();
int64_t proxPointer = 0;
- if (fieldInfos->hasProx()) {
+ if (hasProx_) {
proxPointer = proxOut->getFilePointer();
}
@@ -1154,7 +1174,7 @@ void SDocumentsWriter<T>::appendPostings(ArrayBase<typename ThreadState::FieldDa
while (numToMerge > 0) {
if ((++df % skipInterval) == 0) {
- if (fieldInfos->hasProx()) {
+ if (hasProx_) {
freqOut->writeByte((char)CodeMode::kPfor);
freqOut->writeVInt(docDeltaBuffer.size());
// doc
@@ -1187,7 +1207,7 @@ void SDocumentsWriter<T>::appendPostings(ArrayBase<typename ThreadState::FieldDa
// changing the format to match Lucene's segment
// format.
- if (fieldInfos->hasProx()) {
+ if (hasProx_) {
// position
for (int32_t j = 0; j < termDocFreq; j++) {
const int32_t code = prox.readVInt();
@@ -1235,7 +1255,7 @@ void SDocumentsWriter<T>::appendPostings(ArrayBase<typename ThreadState::FieldDa
// Done merging this term
{
- if (fieldInfos->hasProx()) {
+ if (hasProx_) {
freqOut->writeByte((char)CodeMode::kDefault);
freqOut->writeVInt(docDeltaBuffer.size());
uint32_t lastDoc = 0;
diff --git a/src/core/CLucene/index/SDocumentWriter.h b/src/core/CLucene/index/SDocumentWriter.h
index f1da76fa..5163c2a1 100644
--- a/src/core/CLucene/index/SDocumentWriter.h
+++ b/src/core/CLucene/index/SDocumentWriter.h
@@ -53,6 +53,7 @@ private:
std::vector<uint32_t> freqBuffer;
std::ostream* infoStream{};
int64_t ramBufferSize;
+ bool hasProx_ = false;
public:
class FieldMergeState;
@@ -68,6 +69,7 @@ public:
int32_t lastDocID; // Last docID where this term occurred
int32_t lastDocCode; // Code for prior doc
int32_t lastPosition;// Last position where this term occurred
+ std::basic_string_view<T> term_;
};
/* Stores norms, buffered in RAM, until they are flushed
@@ -672,7 +674,6 @@ public:
this->infoStream = nullptr;
fieldInfos = _CLNEW FieldInfos();
-
this->closed = this->flushPending = false;
postingsFreeCountDW = postingsAllocCountDW = 0;
docStoreOffset = nextDocID = numDocsInRAM = numDocsInStore = nextWriteDocID = 0;
diff --git a/src/core/CLucene/store/IndexOutput.h b/src/core/CLucene/store/IndexOutput.h
index d6887f58..6b6ca321 100644
--- a/src/core/CLucene/store/IndexOutput.h
+++ b/src/core/CLucene/store/IndexOutput.h
@@ -117,7 +117,7 @@ public:
/** Base implementation class for buffered {@link IndexOutput}. */
class CLUCENE_EXPORT BufferedIndexOutput : public IndexOutput{
public:
- LUCENE_STATIC_CONSTANT(int32_t, BUFFER_SIZE=16384);
+ LUCENE_STATIC_CONSTANT(int32_t, BUFFER_SIZE=65536);
private:
uint8_t* buffer;
int64_t bufferStart; // position in file of buffer
diff --git a/src/core/CLucene/util/stringUtil.h b/src/core/CLucene/util/stringUtil.h
index 01bbb7fa..da0547d6 100644
--- a/src/core/CLucene/util/stringUtil.h
+++ b/src/core/CLucene/util/stringUtil.h
@@ -111,4 +111,98 @@ public:
return (char)LUT[c];
}
+class StringUtil {
+public:
+ template <typename T>
+ static inline T unaligned_load(const void* address) {
+ T res {};
+ memcpy(&res, address, sizeof(res));
+ return res;
+ }
+
+#if defined(__SSE2__)
+
+ static inline bool compareSSE2(const char* p1, const char* p2) {
+ return 0xFFFF == _mm_movemask_epi8(_mm_cmpeq_epi8(
+ _mm_loadu_si128(reinterpret_cast<const __m128i*>(p1)),
+ _mm_loadu_si128(reinterpret_cast<const __m128i*>(p2))));
+ }
+
+ static inline bool compareSSE2x4(const char* p1, const char* p2) {
+ return 0xFFFF ==
+ _mm_movemask_epi8(_mm_and_si128(
+ _mm_and_si128(
+ _mm_cmpeq_epi8(
+ _mm_loadu_si128(reinterpret_cast<const __m128i*>(p1)),
+ _mm_loadu_si128(reinterpret_cast<const __m128i*>(p2))),
+ _mm_cmpeq_epi8(
+ _mm_loadu_si128(reinterpret_cast<const __m128i*>(p1) + 1),
+ _mm_loadu_si128(reinterpret_cast<const __m128i*>(p2) + 1))),
+ _mm_and_si128(
+ _mm_cmpeq_epi8(
+ _mm_loadu_si128(reinterpret_cast<const __m128i*>(p1) + 2),
+ _mm_loadu_si128(reinterpret_cast<const __m128i*>(p2) + 2)),
+ _mm_cmpeq_epi8(
+ _mm_loadu_si128(reinterpret_cast<const __m128i*>(p1) + 3),
+ _mm_loadu_si128(reinterpret_cast<const __m128i*>(p2) +
+ 3)))));
+ }
+
+ static inline bool memequalSSE2Wide(const char* p1, const char* p2, size_t size) {
+ if (size <= 16) {
+ if (size >= 8) {
+ /// Chunks of [8,16] bytes.
+ return unaligned_load<uint64_t>(p1) == unaligned_load<uint64_t>(p2) &&
+ unaligned_load<uint64_t>(p1 + size - 8) ==
+ unaligned_load<uint64_t>(p2 + size - 8);
+ } else if (size >= 4) {
+ /// Chunks of [4,7] bytes.
+ return unaligned_load<uint32_t>(p1) == unaligned_load<uint32_t>(p2) &&
+ unaligned_load<uint32_t>(p1 + size - 4) ==
+ unaligned_load<uint32_t>(p2 + size - 4);
+ } else if (size >= 2) {
+ /// Chunks of [2,3] bytes.
+ return unaligned_load<uint16_t>(p1) == unaligned_load<uint16_t>(p2) &&
+ unaligned_load<uint16_t>(p1 + size - 2) ==
+ unaligned_load<uint16_t>(p2 + size - 2);
+ } else if (size >= 1) {
+ /// A single byte.
+ return *p1 == *p2;
+ }
+ return true;
+ }
+
+ while (size >= 64) {
+ if (compareSSE2x4(p1, p2)) {
+ p1 += 64;
+ p2 += 64;
+ size -= 64;
+ } else {
+ return false;
+ }
+ }
+
+ switch (size / 16) {
+ case 3:
+ if (!compareSSE2(p1 + 32, p2 + 32)) {
+ return false;
+ }
+ [[fallthrough]];
+ case 2:
+ if (!compareSSE2(p1 + 16, p2 + 16)) {
+ return false;
+ }
+ [[fallthrough]];
+ case 1:
+ if (!compareSSE2(p1, p2)) {
+ return false;
+ }
+ }
+
+ return compareSSE2(p1 + size - 16, p2 + size - 16);
+ }
+
+#endif
+};
+
#endif//_lucene_util__stringutil_H
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org