You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ignite.apache.org by Tâm Nguyễn Mạnh <ng...@gmail.com> on 2018/09/07 02:02:27 UTC

IGNITE-7482 Cursor in TextQuery fetches all data in first call to next() or hasNext()

Hi,

I tried to implement iterator for GridLuceneInde, could you please help to
review ?

-- 
Thanks & Best Regards

Tam, Nguyen Manh

Re: IGNITE-7482 Cursor in TextQuery fetches all data in first call to next() or hasNext()

Posted by Dmitriy Pavlov <dp...@gmail.com>.
Hi Tam Manh Nguyen,

I've added you to the list of contributors, so now you can assign an issue
to yourself.

Welcome to the Apache Ignite Community!

Sincerely,
Dmitriy Pavlov

пн, 10 сент. 2018 г. в 23:23, Tâm Nguyễn Mạnh <ng...@gmail.com>:

> Hi,
>
> I just registered. Here is my jira account: nguyenmanhtam123@gmail.com
>
> Thank you,
> Tamnm
>
> On Mon, Sep 10, 2018 at 3:25 PM Alexey Goncharuk <
> alexey.goncharuk@gmail.com>
> wrote:
>
> > Hi,
> >
> > Please send your jira account ID so we can add you to the contributors
> > list. Then you will be able to assign tickets to yourself and contribute
> to
> > the project according to the process.
> >
> > You can get more info here:
> >
> > https://cwiki.apache.org/confluence/display/IGNITE/Development+Process
> > https://cwiki.apache.org/confluence/display/IGNITE/How+to+Contribute
> >
> > --AG
> >
> > пн, 10 сент. 2018 г. в 9:16, Tâm Nguyễn Mạnh <nguyenmanhtam123@gmail.com
> >:
> >
> > > Hi,
> > > I have not been assigned yet. But i really want to.
> > >
> > > On Fri, Sep 7, 2018 at 4:13 PM Ilya Kasnacheev <
> > ilya.kasnacheev@gmail.com>
> > > wrote:
> > >
> > > > Hello!
> > > >
> > > > Can you please frame it as Github pull request as per our process? Do
> > you
> > > > have ticket for that?
> > > >
> > > > Regards,
> > > > --
> > > > Ilya Kasnacheev
> > > >
> > > >
> > > > пт, 7 сент. 2018 г. в 5:08, Tâm Nguyễn Mạnh <
> > nguyenmanhtam123@gmail.com
> > > >:
> > > >
> > > > >
> > > > >
> > > >
> > >
> >
> modules\indexing\src\main\java\org\apache\ignite\internal\processors\query\h2\opt\GridLuceneIndex.java
> > > > > ```java
> > > > > /*
> > > > >  * Licensed to the Apache Software Foundation (ASF) under one or
> more
> > > > >  * contributor license agreements.  See the NOTICE file distributed
> > > with
> > > > >  * this work for additional information regarding copyright
> > ownership.
> > > > >  * The ASF licenses this file to You under the Apache License,
> > Version
> > > > 2.0
> > > > >  * (the "License"); you may not use this file except in compliance
> > with
> > > > >  * the License.  You may obtain a copy of the License at
> > > > >  *
> > > > >  *      http://www.apache.org/licenses/LICENSE-2.0
> > > > >  *
> > > > >  * Unless required by applicable law or agreed to in writing,
> > software
> > > > >  * distributed under the License is distributed on an "AS IS"
> BASIS,
> > > > >  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> > > > implied.
> > > > >  * See the License for the specific language governing permissions
> > and
> > > > >  * limitations under the License.
> > > > >  */
> > > > >
> > > > > package org.apache.ignite.internal.processors.query.h2.opt;
> > > > >
> > > > > import java.io.IOException;
> > > > > import java.util.Collection;
> > > > > import java.util.concurrent.atomic.AtomicLong;
> > > > > import org.apache.ignite.IgniteCheckedException;
> > > > > import org.apache.ignite.internal.GridKernalContext;
> > > > > import org.apache.ignite.internal.processors.cache.CacheObject;
> > > > > import
> > org.apache.ignite.internal.processors.cache.CacheObjectContext;
> > > > > import
> > > > >
> org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
> > > > > import
> > > > >
> org.apache.ignite.internal.processors.query.GridQueryIndexDescriptor;
> > > > > import
> > > > org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
> > > > > import org.apache.ignite.internal.util.GridAtomicLong;
> > > > > import
> org.apache.ignite.internal.util.GridCloseableIteratorAdapter;
> > > > > import org.apache.ignite.internal.util.lang.GridCloseableIterator;
> > > > > import
> > org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
> > > > > import org.apache.ignite.internal.util.typedef.internal.U;
> > > > > import org.apache.ignite.lang.IgniteBiTuple;
> > > > > import org.apache.ignite.spi.indexing.IndexingQueryFilter;
> > > > > import org.apache.ignite.spi.indexing.IndexingQueryCacheFilter;
> > > > > import org.apache.lucene.analysis.standard.StandardAnalyzer;
> > > > > import org.apache.lucene.document.Document;
> > > > > import org.apache.lucene.document.Field;
> > > > > import org.apache.lucene.document.LongField;
> > > > > import org.apache.lucene.document.StoredField;
> > > > > import org.apache.lucene.document.StringField;
> > > > > import org.apache.lucene.document.TextField;
> > > > > import org.apache.lucene.index.DirectoryReader;
> > > > > import org.apache.lucene.index.IndexReader;
> > > > > import org.apache.lucene.index.IndexWriter;
> > > > > import org.apache.lucene.index.IndexWriterConfig;
> > > > > import org.apache.lucene.index.Term;
> > > > > import org.apache.lucene.queryparser.classic.MultiFieldQueryParser;
> > > > > import org.apache.lucene.search.BooleanClause;
> > > > > import org.apache.lucene.search.BooleanQuery;
> > > > > import org.apache.lucene.search.IndexSearcher;
> > > > > import org.apache.lucene.search.NumericRangeQuery;
> > > > > import org.apache.lucene.search.Query;
> > > > > import org.apache.lucene.search.ScoreDoc;
> > > > > import org.apache.lucene.search.TopDocs;
> > > > > import org.apache.lucene.util.BytesRef;
> > > > > import org.h2.util.JdbcUtils;
> > > > > import org.jetbrains.annotations.Nullable;
> > > > >
> > > > > import static
> > > > >
> > org.apache.ignite.internal.processors.query.QueryUtils.KEY_FIELD_NAME;
> > > > > import static
> > > > >
> > org.apache.ignite.internal.processors.query.QueryUtils.VAL_FIELD_NAME;
> > > > >
> > > > > /**
> > > > >  * Lucene fulltext index.
> > > > >  */
> > > > > public class GridLuceneIndex implements AutoCloseable {
> > > > >     /** Field name for string representation of value. */
> > > > >     public static final String VAL_STR_FIELD_NAME =
> "_gg_val_str__";
> > > > >
> > > > >     /** Field name for value version. */
> > > > >     public static final String VER_FIELD_NAME = "_gg_ver__";
> > > > >
> > > > >     /** Field name for value expiration time. */
> > > > >     public static final String EXPIRATION_TIME_FIELD_NAME =
> > > > > "_gg_expires__";
> > > > >
> > > > >     /** */
> > > > >     private final String cacheName;
> > > > >
> > > > >     /** */
> > > > >     private final GridQueryTypeDescriptor type;
> > > > >
> > > > >     /** */
> > > > >     private final IndexWriter writer;
> > > > >
> > > > >     /** */
> > > > >     private final String[] idxdFields;
> > > > >
> > > > >     /** */
> > > > >     private final AtomicLong updateCntr = new GridAtomicLong();
> > > > >
> > > > >     /** */
> > > > >     private final GridLuceneDirectory dir;
> > > > >
> > > > >     /** */
> > > > >     private final GridKernalContext ctx;
> > > > >
> > > > >     /**
> > > > >      * Constructor.
> > > > >      *
> > > > >      * @param ctx Kernal context.
> > > > >      * @param cacheName Cache name.
> > > > >      * @param type Type descriptor.
> > > > >      * @throws IgniteCheckedException If failed.
> > > > >      */
> > > > >     public GridLuceneIndex(GridKernalContext ctx, @Nullable String
> > > > > cacheName, GridQueryTypeDescriptor type)
> > > > >         throws IgniteCheckedException {
> > > > >         this.ctx = ctx;
> > > > >         this.cacheName = cacheName;
> > > > >         this.type = type;
> > > > >
> > > > >         dir = new GridLuceneDirectory(new GridUnsafeMemory(0));
> > > > >
> > > > >         try {
> > > > >             writer = new IndexWriter(dir, new IndexWriterConfig(new
> > > > > StandardAnalyzer()));
> > > > >         }
> > > > >         catch (IOException e) {
> > > > >             throw new IgniteCheckedException(e);
> > > > >         }
> > > > >
> > > > >         GridQueryIndexDescriptor idx = type.textIndex();
> > > > >
> > > > >         if (idx != null) {
> > > > >             Collection<String> fields = idx.fields();
> > > > >
> > > > >             idxdFields = new String[fields.size() + 1];
> > > > >
> > > > >             fields.toArray(idxdFields);
> > > > >         }
> > > > >         else {
> > > > >             assert type.valueTextIndex() || type.valueClass() ==
> > > > > String.class;
> > > > >
> > > > >             idxdFields = new String[1];
> > > > >         }
> > > > >
> > > > >         idxdFields[idxdFields.length - 1] = VAL_STR_FIELD_NAME;
> > > > >     }
> > > > >
> > > > >     /**
> > > > >      * @return Cache object context.
> > > > >      */
> > > > >     private CacheObjectContext objectContext() {
> > > > >         if (ctx == null)
> > > > >             return null;
> > > > >
> > > > >         return
> > > > >
> ctx.cache().internalCache(cacheName).context().cacheObjectContext();
> > > > >     }
> > > > >
> > > > >     /**
> > > > >      * Stores given data in this fulltext index.
> > > > >      *
> > > > >      * @param k Key.
> > > > >      * @param v Value.
> > > > >      * @param ver Version.
> > > > >      * @param expires Expiration time.
> > > > >      * @throws IgniteCheckedException If failed.
> > > > >      */
> > > > >     @SuppressWarnings("ConstantConditions")
> > > > >     public void store(CacheObject k, CacheObject v,
> GridCacheVersion
> > > ver,
> > > > > long expires) throws IgniteCheckedException {
> > > > >         CacheObjectContext coctx = objectContext();
> > > > >
> > > > >         Object key = k.isPlatformType() ? k.value(coctx, false) :
> k;
> > > > >         Object val = v.isPlatformType() ? v.value(coctx, false) :
> v;
> > > > >
> > > > >         Document doc = new Document();
> > > > >
> > > > >         boolean stringsFound = false;
> > > > >
> > > > >         if (type.valueTextIndex() || type.valueClass() ==
> > > String.class) {
> > > > >             doc.add(new TextField(VAL_STR_FIELD_NAME,
> val.toString(),
> > > > > Field.Store.YES));
> > > > >
> > > > >             stringsFound = true;
> > > > >         }
> > > > >
> > > > >         for (int i = 0, last = idxdFields.length - 1; i < last;
> i++)
> > {
> > > > >             Object fieldVal = type.value(idxdFields[i], key, val);
> > > > >
> > > > >             if (fieldVal != null) {
> > > > >                 doc.add(new TextField(idxdFields[i],
> > > fieldVal.toString(),
> > > > > Field.Store.YES));
> > > > >
> > > > >                 stringsFound = true;
> > > > >             }
> > > > >         }
> > > > >
> > > > >         BytesRef keyByteRef = new BytesRef(k.valueBytes(coctx));
> > > > >
> > > > >         try {
> > > > >             final Term term = new Term(KEY_FIELD_NAME, keyByteRef);
> > > > >
> > > > >             if (!stringsFound) {
> > > > >                 writer.deleteDocuments(term);
> > > > >
> > > > >                 return; // We did not find any strings to be
> indexed,
> > > > will
> > > > > not store data at all.
> > > > >             }
> > > > >
> > > > >             doc.add(new StringField(KEY_FIELD_NAME, keyByteRef,
> > > > > Field.Store.YES));
> > > > >
> > > > >             if (type.valueClass() != String.class)
> > > > >                 doc.add(new StoredField(VAL_FIELD_NAME,
> > > > > v.valueBytes(coctx)));
> > > > >
> > > > >             doc.add(new StoredField(VER_FIELD_NAME,
> > > > > ver.toString().getBytes()));
> > > > >
> > > > >             doc.add(new LongField(EXPIRATION_TIME_FIELD_NAME,
> > expires,
> > > > > Field.Store.YES));
> > > > >
> > > > >             // Next implies remove than add atomically operation.
> > > > >             writer.updateDocument(term, doc);
> > > > >         }
> > > > >         catch (IOException e) {
> > > > >             throw new IgniteCheckedException(e);
> > > > >         }
> > > > >         finally {
> > > > >             updateCntr.incrementAndGet();
> > > > >         }
> > > > >     }
> > > > >
> > > > >     /**
> > > > >      * Removes entry for given key from this index.
> > > > >      *
> > > > >      * @param key Key.
> > > > >      * @throws IgniteCheckedException If failed.
> > > > >      */
> > > > >     public void remove(CacheObject key) throws
> > IgniteCheckedException {
> > > > >         try {
> > > > >             writer.deleteDocuments(new Term(KEY_FIELD_NAME,
> > > > >                 new BytesRef(key.valueBytes(objectContext()))));
> > > > >         }
> > > > >         catch (IOException e) {
> > > > >             throw new IgniteCheckedException(e);
> > > > >         }
> > > > >         finally {
> > > > >             updateCntr.incrementAndGet();
> > > > >         }
> > > > >     }
> > > > >
> > > > >     /**
> > > > >      * Runs lucene fulltext query over this index.
> > > > >      *
> > > > >      * @param qry Query.
> > > > >      * @param filters Filters over result.
> > > > >      * @param pageSize Size of batch
> > > > >      * @return Query result.
> > > > >      * @throws IgniteCheckedException If failed.
> > > > >      */
> > > > >     public <K, V> GridCloseableIterator<IgniteBiTuple<K, V>>
> > > query(String
> > > > > qry, IndexingQueryFilter filters, int pageSize) throws
> > > > > IgniteCheckedException {
> > > > >         IndexReader reader;
> > > > >
> > > > >         try {
> > > > >             long updates = updateCntr.get();
> > > > >
> > > > >             if (updates != 0) {
> > > > >                 writer.commit();
> > > > >
> > > > >                 updateCntr.addAndGet(-updates);
> > > > >             }
> > > > >
> > > > >             //We can cache reader\searcher and change this to
> > > > > 'openIfChanged'
> > > > >             reader = DirectoryReader.open(writer, true);
> > > > >         }
> > > > >         catch (IOException e) {
> > > > >             throw new IgniteCheckedException(e);
> > > > >         }
> > > > >
> > > > >         IndexSearcher searcher;
> > > > >
> > > > >         Query query;
> > > > >
> > > > >         try {
> > > > >             searcher = new IndexSearcher(reader);
> > > > >
> > > > >             MultiFieldQueryParser parser = new
> > > > > MultiFieldQueryParser(idxdFields,
> > > > >                 writer.getAnalyzer());
> > > > >
> > > > > //            parser.setAllowLeadingWildcard(true);
> > > > >
> > > > >             // Filter expired items.
> > > > >             Query filter =
> > > > > NumericRangeQuery.newLongRange(EXPIRATION_TIME_FIELD_NAME,
> > > > > U.currentTimeMillis(),
> > > > >                 null, false, false);
> > > > >
> > > > >             query = new BooleanQuery.Builder()
> > > > >                 .add(parser.parse(qry), BooleanClause.Occur.MUST)
> > > > >                 .add(filter, BooleanClause.Occur.FILTER)
> > > > >                 .build();
> > > > >         }
> > > > >         catch (Exception e) {
> > > > >             U.closeQuiet(reader);
> > > > >
> > > > >             throw new IgniteCheckedException(e);
> > > > >         }
> > > > >
> > > > >         IndexingQueryCacheFilter fltr = null;
> > > > >
> > > > >         if (filters != null)
> > > > >             fltr = filters.forCache(cacheName);
> > > > >
> > > > >         return new It<>(reader, searcher, query, fltr, pageSize);
> > > > >     }
> > > > >
> > > > >     /** {@inheritDoc} */
> > > > >     @Override public void close() {
> > > > >         U.closeQuiet(writer);
> > > > >         U.close(dir, ctx.log(GridLuceneIndex.class));
> > > > >     }
> > > > >
> > > > >     /**
> > > > >      * Key-value iterator over fulltext search result.
> > > > >      */
> > > > >     private class It<K, V> extends
> > > > > GridCloseableIteratorAdapter<IgniteBiTuple<K, V>> {
> > > > >         private final int BatchPosBeforeHead = -1;
> > > > >
> > > > >         /** */
> > > > >         private static final long serialVersionUID = 0L;
> > > > >
> > > > >         /** */
> > > > >         private final int pageSize;
> > > > >
> > > > >         /** */
> > > > >         private final IndexReader reader;
> > > > >
> > > > >         /** */
> > > > >         private final Query query;
> > > > >
> > > > >         /** */
> > > > >         private final IndexSearcher searcher;
> > > > >
> > > > >         /** current batch docs*/
> > > > >         private ScoreDoc[] batch;
> > > > >
> > > > >         /** current position in batch*/
> > > > >         private int batchPos = BatchPosBeforeHead;
> > > > >
> > > > >         /** */
> > > > >         private final IndexingQueryCacheFilter filters;
> > > > >
> > > > >         /** */
> > > > >         private IgniteBiTuple<K, V> curr;
> > > > >
> > > > >         /** */
> > > > >         private CacheObjectContext coctx;
> > > > >
> > > > >         /**
> > > > >          * Constructor.
> > > > >          *
> > > > >          * @param reader Reader.
> > > > >          * @param searcher Searcher.
> > > > >          * @param filters Filters over result.
> > > > >          * @throws IgniteCheckedException if failed.
> > > > >          */
> > > > >         private It(IndexReader reader, IndexSearcher searcher,
> Query
> > > > query,
> > > > > IndexingQueryCacheFilter filters, int pageSize)
> > > > >             throws IgniteCheckedException {
> > > > >             this.reader = reader;
> > > > >             this.searcher = searcher;
> > > > >             this.filters = filters;
> > > > >             this.query = query;
> > > > >             this.pageSize = pageSize;
> > > > >
> > > > >             coctx = objectContext();
> > > > >
> > > > >             findNext();
> > > > >         }
> > > > >
> > > > >         /**
> > > > >          * @param bytes Bytes.
> > > > >          * @param ldr Class loader.
> > > > >          * @return Object.
> > > > >          * @throws IgniteCheckedException If failed.
> > > > >          */
> > > > >         @SuppressWarnings("unchecked")
> > > > >         private <Z> Z unmarshall(byte[] bytes, ClassLoader ldr)
> > throws
> > > > > IgniteCheckedException {
> > > > >             if (coctx == null) // For tests.
> > > > >                 return (Z)JdbcUtils.deserialize(bytes, null);
> > > > >
> > > > >             return
> > > > (Z)coctx.kernalContext().cacheObjects().unmarshal(coctx,
> > > > > bytes, ldr);
> > > > >         }
> > > > >
> > > > >         /**
> > > > >          * Finds next element.
> > > > >          *
> > > > >          * @throws IgniteCheckedException If failed.
> > > > >          */
> > > > >         @SuppressWarnings("unchecked")
> > > > >         private void findNext() throws IgniteCheckedException {
> > > > >             curr = null;
> > > > >
> > > > >             if(isClosed())
> > > > >                 throw new IgniteCheckedException("Iterator already
> > > > > closed");
> > > > >
> > > > >             if (shouldRequestNextBatch()) {
> > > > >                 try {
> > > > >                     requestNextBatch();
> > > > >                 } catch (IOException e) {
> > > > >                     close();
> > > > >                     throw new IgniteCheckedException(e);
> > > > >                 }
> > > > >             }
> > > > >
> > > > >             if(batch == null)
> > > > >                 return;
> > > > >
> > > > >             while (batchPos < batch.length) {
> > > > >                 Document doc;
> > > > >                 ScoreDoc scoreDoc =batch[batchPos++];
> > > > >
> > > > >                 try {
> > > > >                     doc = searcher.doc(scoreDoc.doc);
> > > > >                 }
> > > > >                 catch (IOException e) {
> > > > >                     throw new IgniteCheckedException(e);
> > > > >                 }
> > > > >
> > > > >                 ClassLoader ldr = null;
> > > > >
> > > > >                 if (ctx != null && ctx.deploy().enabled())
> > > > >                     ldr =
> > > > >
> > ctx.cache().internalCache(cacheName).context().deploy().globalLoader();
> > > > >
> > > > >                 K k =
> > > > unmarshall(doc.getBinaryValue(KEY_FIELD_NAME).bytes,
> > > > > ldr);
> > > > >
> > > > >                 if (filters != null && !filters.apply(k))
> > > > >                     continue;
> > > > >
> > > > >                 V v = type.valueClass() == String.class ?
> > > > >                     (V)doc.get(VAL_STR_FIELD_NAME) :
> > > > >
> > > > > this.<V>unmarshall(doc.getBinaryValue(VAL_FIELD_NAME).bytes, ldr);
> > > > >
> > > > >                 assert v != null;
> > > > >
> > > > >                 curr = new IgniteBiTuple<>(k, v);
> > > > >
> > > > >                 break;
> > > > >             }
> > > > >         }
> > > > >
> > > > >         private boolean shouldRequestNextBatch()  {
> > > > >             if(batch == null){
> > > > >                 // should request for first batch
> > > > >                 return (batchPos == BatchPosBeforeHead) ;
> > > > >             } else {
> > > > >                 // should request when reached to the end of batch
> > > > >                 return (batchPos  == batch.length);
> > > > >             }
> > > > >         }
> > > > >
> > > > >         private void requestNextBatch() throws IOException {
> > > > >             TopDocs docs;
> > > > >
> > > > >             if (batch == null) {
> > > > >                 docs = searcher.search(query, pageSize);
> > > > >             } else {
> > > > >                 docs = searcher.searchAfter(batch[batch.length -
> 1],
> > > > query,
> > > > > pageSize);
> > > > >             }
> > > > >
> > > > >             if(docs.scoreDocs.length ==0) {
> > > > >                 batch = null;
> > > > >             }else{
> > > > >                 batch = docs.scoreDocs;
> > > > >             }
> > > > >
> > > > >             batchPos = 0;
> > > > >         }
> > > > >
> > > > >         /** {@inheritDoc} */
> > > > >         @Override protected IgniteBiTuple<K, V> onNext() throws
> > > > > IgniteCheckedException {
> > > > >             IgniteBiTuple<K, V> res = curr;
> > > > >
> > > > >             findNext();
> > > > >
> > > > >             return res;
> > > > >         }
> > > > >
> > > > >         /** {@inheritDoc} */
> > > > >         @Override protected boolean onHasNext() throws
> > > > > IgniteCheckedException {
> > > > >             return curr != null;
> > > > >         }
> > > > >
> > > > >         /** {@inheritDoc} */
> > > > >         @Override protected void onClose() throws
> > > IgniteCheckedException
> > > > {
> > > > >             U.closeQuiet(reader);
> > > > >         }
> > > > >     }
> > > > > }
> > > > > ```
> > > > >
> > > > > On Fri, Sep 7, 2018 at 9:02 AM Tâm Nguyễn Mạnh <
> > > > nguyenmanhtam123@gmail.com
> > > > > >
> > > > > wrote:
> > > > >
> > > > > > Hi,
> > > > > >
> > > > > > I tried to implement iterator for GridLuceneInde, could you
> please
> > > help
> > > > > to
> > > > > > review ?
> > > > > >
> > > > > > --
> > > > > > Thanks & Best Regards
> > > > > >
> > > > > > Tam, Nguyen Manh
> > > > > >
> > > > > >
> > > > >
> > > > > --
> > > > > Thanks & Best Regards
> > > > >
> > > > > Tam, Nguyen Manh
> > > > >
> > > >
> > >
> > >
> > > --
> > > Thanks & Best Regards
> > >
> > > Tam, Nguyen Manh
> > >
> >
>
>
> --
> Thanks & Best Regards
>
> Tam, Nguyen Manh
>

Re: IGNITE-7482 Cursor in TextQuery fetches all data in first call to next() or hasNext()

Posted by Tâm Nguyễn Mạnh <ng...@gmail.com>.
Hi,

I just registered. Here is my jira account: nguyenmanhtam123@gmail.com

Thank you,
Tamnm

On Mon, Sep 10, 2018 at 3:25 PM Alexey Goncharuk <al...@gmail.com>
wrote:

> Hi,
>
> Please send your jira account ID so we can add you to the contributors
> list. Then you will be able to assign tickets to yourself and contribute to
> the project according to the process.
>
> You can get more info here:
>
> https://cwiki.apache.org/confluence/display/IGNITE/Development+Process
> https://cwiki.apache.org/confluence/display/IGNITE/How+to+Contribute
>
> --AG
>
> пн, 10 сент. 2018 г. в 9:16, Tâm Nguyễn Mạnh <ng...@gmail.com>:
>
> > Hi,
> > I have not been assigned yet. But i really want to.
> >
> > On Fri, Sep 7, 2018 at 4:13 PM Ilya Kasnacheev <
> ilya.kasnacheev@gmail.com>
> > wrote:
> >
> > > Hello!
> > >
> > > Can you please frame it as Github pull request as per our process? Do
> you
> > > have ticket for that?
> > >
> > > Regards,
> > > --
> > > Ilya Kasnacheev
> > >
> > >
> > > пт, 7 сент. 2018 г. в 5:08, Tâm Nguyễn Mạnh <
> nguyenmanhtam123@gmail.com
> > >:
> > >
> > > >
> > > >
> > >
> >
> modules\indexing\src\main\java\org\apache\ignite\internal\processors\query\h2\opt\GridLuceneIndex.java
> > > > ```java
> > > > /*
> > > >  * Licensed to the Apache Software Foundation (ASF) under one or more
> > > >  * contributor license agreements.  See the NOTICE file distributed
> > with
> > > >  * this work for additional information regarding copyright
> ownership.
> > > >  * The ASF licenses this file to You under the Apache License,
> Version
> > > 2.0
> > > >  * (the "License"); you may not use this file except in compliance
> with
> > > >  * the License.  You may obtain a copy of the License at
> > > >  *
> > > >  *      http://www.apache.org/licenses/LICENSE-2.0
> > > >  *
> > > >  * Unless required by applicable law or agreed to in writing,
> software
> > > >  * distributed under the License is distributed on an "AS IS" BASIS,
> > > >  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> > > implied.
> > > >  * See the License for the specific language governing permissions
> and
> > > >  * limitations under the License.
> > > >  */
> > > >
> > > > package org.apache.ignite.internal.processors.query.h2.opt;
> > > >
> > > > import java.io.IOException;
> > > > import java.util.Collection;
> > > > import java.util.concurrent.atomic.AtomicLong;
> > > > import org.apache.ignite.IgniteCheckedException;
> > > > import org.apache.ignite.internal.GridKernalContext;
> > > > import org.apache.ignite.internal.processors.cache.CacheObject;
> > > > import
> org.apache.ignite.internal.processors.cache.CacheObjectContext;
> > > > import
> > > > org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
> > > > import
> > > > org.apache.ignite.internal.processors.query.GridQueryIndexDescriptor;
> > > > import
> > > org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
> > > > import org.apache.ignite.internal.util.GridAtomicLong;
> > > > import org.apache.ignite.internal.util.GridCloseableIteratorAdapter;
> > > > import org.apache.ignite.internal.util.lang.GridCloseableIterator;
> > > > import
> org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
> > > > import org.apache.ignite.internal.util.typedef.internal.U;
> > > > import org.apache.ignite.lang.IgniteBiTuple;
> > > > import org.apache.ignite.spi.indexing.IndexingQueryFilter;
> > > > import org.apache.ignite.spi.indexing.IndexingQueryCacheFilter;
> > > > import org.apache.lucene.analysis.standard.StandardAnalyzer;
> > > > import org.apache.lucene.document.Document;
> > > > import org.apache.lucene.document.Field;
> > > > import org.apache.lucene.document.LongField;
> > > > import org.apache.lucene.document.StoredField;
> > > > import org.apache.lucene.document.StringField;
> > > > import org.apache.lucene.document.TextField;
> > > > import org.apache.lucene.index.DirectoryReader;
> > > > import org.apache.lucene.index.IndexReader;
> > > > import org.apache.lucene.index.IndexWriter;
> > > > import org.apache.lucene.index.IndexWriterConfig;
> > > > import org.apache.lucene.index.Term;
> > > > import org.apache.lucene.queryparser.classic.MultiFieldQueryParser;
> > > > import org.apache.lucene.search.BooleanClause;
> > > > import org.apache.lucene.search.BooleanQuery;
> > > > import org.apache.lucene.search.IndexSearcher;
> > > > import org.apache.lucene.search.NumericRangeQuery;
> > > > import org.apache.lucene.search.Query;
> > > > import org.apache.lucene.search.ScoreDoc;
> > > > import org.apache.lucene.search.TopDocs;
> > > > import org.apache.lucene.util.BytesRef;
> > > > import org.h2.util.JdbcUtils;
> > > > import org.jetbrains.annotations.Nullable;
> > > >
> > > > import static
> > > >
> org.apache.ignite.internal.processors.query.QueryUtils.KEY_FIELD_NAME;
> > > > import static
> > > >
> org.apache.ignite.internal.processors.query.QueryUtils.VAL_FIELD_NAME;
> > > >
> > > > /**
> > > >  * Lucene fulltext index.
> > > >  */
> > > > public class GridLuceneIndex implements AutoCloseable {
> > > >     /** Field name for string representation of value. */
> > > >     public static final String VAL_STR_FIELD_NAME = "_gg_val_str__";
> > > >
> > > >     /** Field name for value version. */
> > > >     public static final String VER_FIELD_NAME = "_gg_ver__";
> > > >
> > > >     /** Field name for value expiration time. */
> > > >     public static final String EXPIRATION_TIME_FIELD_NAME =
> > > > "_gg_expires__";
> > > >
> > > >     /** */
> > > >     private final String cacheName;
> > > >
> > > >     /** */
> > > >     private final GridQueryTypeDescriptor type;
> > > >
> > > >     /** */
> > > >     private final IndexWriter writer;
> > > >
> > > >     /** */
> > > >     private final String[] idxdFields;
> > > >
> > > >     /** */
> > > >     private final AtomicLong updateCntr = new GridAtomicLong();
> > > >
> > > >     /** */
> > > >     private final GridLuceneDirectory dir;
> > > >
> > > >     /** */
> > > >     private final GridKernalContext ctx;
> > > >
> > > >     /**
> > > >      * Constructor.
> > > >      *
> > > >      * @param ctx Kernal context.
> > > >      * @param cacheName Cache name.
> > > >      * @param type Type descriptor.
> > > >      * @throws IgniteCheckedException If failed.
> > > >      */
> > > >     public GridLuceneIndex(GridKernalContext ctx, @Nullable String
> > > > cacheName, GridQueryTypeDescriptor type)
> > > >         throws IgniteCheckedException {
> > > >         this.ctx = ctx;
> > > >         this.cacheName = cacheName;
> > > >         this.type = type;
> > > >
> > > >         dir = new GridLuceneDirectory(new GridUnsafeMemory(0));
> > > >
> > > >         try {
> > > >             writer = new IndexWriter(dir, new IndexWriterConfig(new
> > > > StandardAnalyzer()));
> > > >         }
> > > >         catch (IOException e) {
> > > >             throw new IgniteCheckedException(e);
> > > >         }
> > > >
> > > >         GridQueryIndexDescriptor idx = type.textIndex();
> > > >
> > > >         if (idx != null) {
> > > >             Collection<String> fields = idx.fields();
> > > >
> > > >             idxdFields = new String[fields.size() + 1];
> > > >
> > > >             fields.toArray(idxdFields);
> > > >         }
> > > >         else {
> > > >             assert type.valueTextIndex() || type.valueClass() ==
> > > > String.class;
> > > >
> > > >             idxdFields = new String[1];
> > > >         }
> > > >
> > > >         idxdFields[idxdFields.length - 1] = VAL_STR_FIELD_NAME;
> > > >     }
> > > >
> > > >     /**
> > > >      * @return Cache object context.
> > > >      */
> > > >     private CacheObjectContext objectContext() {
> > > >         if (ctx == null)
> > > >             return null;
> > > >
> > > >         return
> > > > ctx.cache().internalCache(cacheName).context().cacheObjectContext();
> > > >     }
> > > >
> > > >     /**
> > > >      * Stores given data in this fulltext index.
> > > >      *
> > > >      * @param k Key.
> > > >      * @param v Value.
> > > >      * @param ver Version.
> > > >      * @param expires Expiration time.
> > > >      * @throws IgniteCheckedException If failed.
> > > >      */
> > > >     @SuppressWarnings("ConstantConditions")
> > > >     public void store(CacheObject k, CacheObject v, GridCacheVersion
> > ver,
> > > > long expires) throws IgniteCheckedException {
> > > >         CacheObjectContext coctx = objectContext();
> > > >
> > > >         Object key = k.isPlatformType() ? k.value(coctx, false) : k;
> > > >         Object val = v.isPlatformType() ? v.value(coctx, false) : v;
> > > >
> > > >         Document doc = new Document();
> > > >
> > > >         boolean stringsFound = false;
> > > >
> > > >         if (type.valueTextIndex() || type.valueClass() ==
> > String.class) {
> > > >             doc.add(new TextField(VAL_STR_FIELD_NAME, val.toString(),
> > > > Field.Store.YES));
> > > >
> > > >             stringsFound = true;
> > > >         }
> > > >
> > > >         for (int i = 0, last = idxdFields.length - 1; i < last; i++)
> {
> > > >             Object fieldVal = type.value(idxdFields[i], key, val);
> > > >
> > > >             if (fieldVal != null) {
> > > >                 doc.add(new TextField(idxdFields[i],
> > fieldVal.toString(),
> > > > Field.Store.YES));
> > > >
> > > >                 stringsFound = true;
> > > >             }
> > > >         }
> > > >
> > > >         BytesRef keyByteRef = new BytesRef(k.valueBytes(coctx));
> > > >
> > > >         try {
> > > >             final Term term = new Term(KEY_FIELD_NAME, keyByteRef);
> > > >
> > > >             if (!stringsFound) {
> > > >                 writer.deleteDocuments(term);
> > > >
> > > >                 return; // We did not find any strings to be indexed,
> > > will
> > > > not store data at all.
> > > >             }
> > > >
> > > >             doc.add(new StringField(KEY_FIELD_NAME, keyByteRef,
> > > > Field.Store.YES));
> > > >
> > > >             if (type.valueClass() != String.class)
> > > >                 doc.add(new StoredField(VAL_FIELD_NAME,
> > > > v.valueBytes(coctx)));
> > > >
> > > >             doc.add(new StoredField(VER_FIELD_NAME,
> > > > ver.toString().getBytes()));
> > > >
> > > >             doc.add(new LongField(EXPIRATION_TIME_FIELD_NAME,
> expires,
> > > > Field.Store.YES));
> > > >
> > > >             // Next implies remove than add atomically operation.
> > > >             writer.updateDocument(term, doc);
> > > >         }
> > > >         catch (IOException e) {
> > > >             throw new IgniteCheckedException(e);
> > > >         }
> > > >         finally {
> > > >             updateCntr.incrementAndGet();
> > > >         }
> > > >     }
> > > >
> > > >     /**
> > > >      * Removes entry for given key from this index.
> > > >      *
> > > >      * @param key Key.
> > > >      * @throws IgniteCheckedException If failed.
> > > >      */
> > > >     public void remove(CacheObject key) throws
> IgniteCheckedException {
> > > >         try {
> > > >             writer.deleteDocuments(new Term(KEY_FIELD_NAME,
> > > >                 new BytesRef(key.valueBytes(objectContext()))));
> > > >         }
> > > >         catch (IOException e) {
> > > >             throw new IgniteCheckedException(e);
> > > >         }
> > > >         finally {
> > > >             updateCntr.incrementAndGet();
> > > >         }
> > > >     }
> > > >
> > > >     /**
> > > >      * Runs lucene fulltext query over this index.
> > > >      *
> > > >      * @param qry Query.
> > > >      * @param filters Filters over result.
> > > >      * @param pageSize Size of batch
> > > >      * @return Query result.
> > > >      * @throws IgniteCheckedException If failed.
> > > >      */
> > > >     public <K, V> GridCloseableIterator<IgniteBiTuple<K, V>>
> > query(String
> > > > qry, IndexingQueryFilter filters, int pageSize) throws
> > > > IgniteCheckedException {
> > > >         IndexReader reader;
> > > >
> > > >         try {
> > > >             long updates = updateCntr.get();
> > > >
> > > >             if (updates != 0) {
> > > >                 writer.commit();
> > > >
> > > >                 updateCntr.addAndGet(-updates);
> > > >             }
> > > >
> > > >             //We can cache reader\searcher and change this to
> > > > 'openIfChanged'
> > > >             reader = DirectoryReader.open(writer, true);
> > > >         }
> > > >         catch (IOException e) {
> > > >             throw new IgniteCheckedException(e);
> > > >         }
> > > >
> > > >         IndexSearcher searcher;
> > > >
> > > >         Query query;
> > > >
> > > >         try {
> > > >             searcher = new IndexSearcher(reader);
> > > >
> > > >             MultiFieldQueryParser parser = new
> > > > MultiFieldQueryParser(idxdFields,
> > > >                 writer.getAnalyzer());
> > > >
> > > > //            parser.setAllowLeadingWildcard(true);
> > > >
> > > >             // Filter expired items.
> > > >             Query filter =
> > > > NumericRangeQuery.newLongRange(EXPIRATION_TIME_FIELD_NAME,
> > > > U.currentTimeMillis(),
> > > >                 null, false, false);
> > > >
> > > >             query = new BooleanQuery.Builder()
> > > >                 .add(parser.parse(qry), BooleanClause.Occur.MUST)
> > > >                 .add(filter, BooleanClause.Occur.FILTER)
> > > >                 .build();
> > > >         }
> > > >         catch (Exception e) {
> > > >             U.closeQuiet(reader);
> > > >
> > > >             throw new IgniteCheckedException(e);
> > > >         }
> > > >
> > > >         IndexingQueryCacheFilter fltr = null;
> > > >
> > > >         if (filters != null)
> > > >             fltr = filters.forCache(cacheName);
> > > >
> > > >         return new It<>(reader, searcher, query, fltr, pageSize);
> > > >     }
> > > >
> > > >     /** {@inheritDoc} */
> > > >     @Override public void close() {
> > > >         U.closeQuiet(writer);
> > > >         U.close(dir, ctx.log(GridLuceneIndex.class));
> > > >     }
> > > >
> > > >     /**
> > > >      * Key-value iterator over fulltext search result.
> > > >      */
> > > >     private class It<K, V> extends
> > > > GridCloseableIteratorAdapter<IgniteBiTuple<K, V>> {
> > > >         private final int BatchPosBeforeHead = -1;
> > > >
> > > >         /** */
> > > >         private static final long serialVersionUID = 0L;
> > > >
> > > >         /** */
> > > >         private final int pageSize;
> > > >
> > > >         /** */
> > > >         private final IndexReader reader;
> > > >
> > > >         /** */
> > > >         private final Query query;
> > > >
> > > >         /** */
> > > >         private final IndexSearcher searcher;
> > > >
> > > >         /** current batch docs*/
> > > >         private ScoreDoc[] batch;
> > > >
> > > >         /** current position in batch*/
> > > >         private int batchPos = BatchPosBeforeHead;
> > > >
> > > >         /** */
> > > >         private final IndexingQueryCacheFilter filters;
> > > >
> > > >         /** */
> > > >         private IgniteBiTuple<K, V> curr;
> > > >
> > > >         /** */
> > > >         private CacheObjectContext coctx;
> > > >
> > > >         /**
> > > >          * Constructor.
> > > >          *
> > > >          * @param reader Reader.
> > > >          * @param searcher Searcher.
> > > >          * @param filters Filters over result.
> > > >          * @throws IgniteCheckedException if failed.
> > > >          */
> > > >         private It(IndexReader reader, IndexSearcher searcher, Query
> > > query,
> > > > IndexingQueryCacheFilter filters, int pageSize)
> > > >             throws IgniteCheckedException {
> > > >             this.reader = reader;
> > > >             this.searcher = searcher;
> > > >             this.filters = filters;
> > > >             this.query = query;
> > > >             this.pageSize = pageSize;
> > > >
> > > >             coctx = objectContext();
> > > >
> > > >             findNext();
> > > >         }
> > > >
> > > >         /**
> > > >          * @param bytes Bytes.
> > > >          * @param ldr Class loader.
> > > >          * @return Object.
> > > >          * @throws IgniteCheckedException If failed.
> > > >          */
> > > >         @SuppressWarnings("unchecked")
> > > >         private <Z> Z unmarshall(byte[] bytes, ClassLoader ldr)
> throws
> > > > IgniteCheckedException {
> > > >             if (coctx == null) // For tests.
> > > >                 return (Z)JdbcUtils.deserialize(bytes, null);
> > > >
> > > >             return
> > > (Z)coctx.kernalContext().cacheObjects().unmarshal(coctx,
> > > > bytes, ldr);
> > > >         }
> > > >
> > > >         /**
> > > >          * Finds next element.
> > > >          *
> > > >          * @throws IgniteCheckedException If failed.
> > > >          */
> > > >         @SuppressWarnings("unchecked")
> > > >         private void findNext() throws IgniteCheckedException {
> > > >             curr = null;
> > > >
> > > >             if(isClosed())
> > > >                 throw new IgniteCheckedException("Iterator already
> > > > closed");
> > > >
> > > >             if (shouldRequestNextBatch()) {
> > > >                 try {
> > > >                     requestNextBatch();
> > > >                 } catch (IOException e) {
> > > >                     close();
> > > >                     throw new IgniteCheckedException(e);
> > > >                 }
> > > >             }
> > > >
> > > >             if(batch == null)
> > > >                 return;
> > > >
> > > >             while (batchPos < batch.length) {
> > > >                 Document doc;
> > > >                 ScoreDoc scoreDoc =batch[batchPos++];
> > > >
> > > >                 try {
> > > >                     doc = searcher.doc(scoreDoc.doc);
> > > >                 }
> > > >                 catch (IOException e) {
> > > >                     throw new IgniteCheckedException(e);
> > > >                 }
> > > >
> > > >                 ClassLoader ldr = null;
> > > >
> > > >                 if (ctx != null && ctx.deploy().enabled())
> > > >                     ldr =
> > > >
> ctx.cache().internalCache(cacheName).context().deploy().globalLoader();
> > > >
> > > >                 K k =
> > > unmarshall(doc.getBinaryValue(KEY_FIELD_NAME).bytes,
> > > > ldr);
> > > >
> > > >                 if (filters != null && !filters.apply(k))
> > > >                     continue;
> > > >
> > > >                 V v = type.valueClass() == String.class ?
> > > >                     (V)doc.get(VAL_STR_FIELD_NAME) :
> > > >
> > > > this.<V>unmarshall(doc.getBinaryValue(VAL_FIELD_NAME).bytes, ldr);
> > > >
> > > >                 assert v != null;
> > > >
> > > >                 curr = new IgniteBiTuple<>(k, v);
> > > >
> > > >                 break;
> > > >             }
> > > >         }
> > > >
> > > >         private boolean shouldRequestNextBatch()  {
> > > >             if(batch == null){
> > > >                 // should request for first batch
> > > >                 return (batchPos == BatchPosBeforeHead) ;
> > > >             } else {
> > > >                 // should request when reached to the end of batch
> > > >                 return (batchPos  == batch.length);
> > > >             }
> > > >         }
> > > >
> > > >         private void requestNextBatch() throws IOException {
> > > >             TopDocs docs;
> > > >
> > > >             if (batch == null) {
> > > >                 docs = searcher.search(query, pageSize);
> > > >             } else {
> > > >                 docs = searcher.searchAfter(batch[batch.length - 1],
> > > query,
> > > > pageSize);
> > > >             }
> > > >
> > > >             if(docs.scoreDocs.length ==0) {
> > > >                 batch = null;
> > > >             }else{
> > > >                 batch = docs.scoreDocs;
> > > >             }
> > > >
> > > >             batchPos = 0;
> > > >         }
> > > >
> > > >         /** {@inheritDoc} */
> > > >         @Override protected IgniteBiTuple<K, V> onNext() throws
> > > > IgniteCheckedException {
> > > >             IgniteBiTuple<K, V> res = curr;
> > > >
> > > >             findNext();
> > > >
> > > >             return res;
> > > >         }
> > > >
> > > >         /** {@inheritDoc} */
> > > >         @Override protected boolean onHasNext() throws
> > > > IgniteCheckedException {
> > > >             return curr != null;
> > > >         }
> > > >
> > > >         /** {@inheritDoc} */
> > > >         @Override protected void onClose() throws
> > IgniteCheckedException
> > > {
> > > >             U.closeQuiet(reader);
> > > >         }
> > > >     }
> > > > }
> > > > ```
> > > >
> > > > On Fri, Sep 7, 2018 at 9:02 AM Tâm Nguyễn Mạnh <
> > > nguyenmanhtam123@gmail.com
> > > > >
> > > > wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > > I tried to implement iterator for GridLuceneInde, could you please
> > help
> > > > to
> > > > > review ?
> > > > >
> > > > > --
> > > > > Thanks & Best Regards
> > > > >
> > > > > Tam, Nguyen Manh
> > > > >
> > > > >
> > > >
> > > > --
> > > > Thanks & Best Regards
> > > >
> > > > Tam, Nguyen Manh
> > > >
> > >
> >
> >
> > --
> > Thanks & Best Regards
> >
> > Tam, Nguyen Manh
> >
>


-- 
Thanks & Best Regards

Tam, Nguyen Manh

Re: IGNITE-7482 Cursor in TextQuery fetches all data in first call to next() or hasNext()

Posted by Alexey Goncharuk <al...@gmail.com>.
Hi,

Please send your jira account ID so we can add you to the contributors
list. Then you will be able to assign tickets to yourself and contribute to
the project according to the process.

You can get more info here:

https://cwiki.apache.org/confluence/display/IGNITE/Development+Process
https://cwiki.apache.org/confluence/display/IGNITE/How+to+Contribute

--AG

пн, 10 сент. 2018 г. в 9:16, Tâm Nguyễn Mạnh <ng...@gmail.com>:

> Hi,
> I have not been assigned yet. But i really want to.
>
> On Fri, Sep 7, 2018 at 4:13 PM Ilya Kasnacheev <il...@gmail.com>
> wrote:
>
> > Hello!
> >
> > Can you please frame it as Github pull request as per our process? Do you
> > have ticket for that?
> >
> > Regards,
> > --
> > Ilya Kasnacheev
> >
> >
> > пт, 7 сент. 2018 г. в 5:08, Tâm Nguyễn Mạnh <nguyenmanhtam123@gmail.com
> >:
> >
> > >
> > >
> >
> modules\indexing\src\main\java\org\apache\ignite\internal\processors\query\h2\opt\GridLuceneIndex.java
> > > ```java
> > > /*
> > >  * Licensed to the Apache Software Foundation (ASF) under one or more
> > >  * contributor license agreements.  See the NOTICE file distributed
> with
> > >  * this work for additional information regarding copyright ownership.
> > >  * The ASF licenses this file to You under the Apache License, Version
> > 2.0
> > >  * (the "License"); you may not use this file except in compliance with
> > >  * the License.  You may obtain a copy of the License at
> > >  *
> > >  *      http://www.apache.org/licenses/LICENSE-2.0
> > >  *
> > >  * Unless required by applicable law or agreed to in writing, software
> > >  * distributed under the License is distributed on an "AS IS" BASIS,
> > >  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> > implied.
> > >  * See the License for the specific language governing permissions and
> > >  * limitations under the License.
> > >  */
> > >
> > > package org.apache.ignite.internal.processors.query.h2.opt;
> > >
> > > import java.io.IOException;
> > > import java.util.Collection;
> > > import java.util.concurrent.atomic.AtomicLong;
> > > import org.apache.ignite.IgniteCheckedException;
> > > import org.apache.ignite.internal.GridKernalContext;
> > > import org.apache.ignite.internal.processors.cache.CacheObject;
> > > import org.apache.ignite.internal.processors.cache.CacheObjectContext;
> > > import
> > > org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
> > > import
> > > org.apache.ignite.internal.processors.query.GridQueryIndexDescriptor;
> > > import
> > org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
> > > import org.apache.ignite.internal.util.GridAtomicLong;
> > > import org.apache.ignite.internal.util.GridCloseableIteratorAdapter;
> > > import org.apache.ignite.internal.util.lang.GridCloseableIterator;
> > > import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
> > > import org.apache.ignite.internal.util.typedef.internal.U;
> > > import org.apache.ignite.lang.IgniteBiTuple;
> > > import org.apache.ignite.spi.indexing.IndexingQueryFilter;
> > > import org.apache.ignite.spi.indexing.IndexingQueryCacheFilter;
> > > import org.apache.lucene.analysis.standard.StandardAnalyzer;
> > > import org.apache.lucene.document.Document;
> > > import org.apache.lucene.document.Field;
> > > import org.apache.lucene.document.LongField;
> > > import org.apache.lucene.document.StoredField;
> > > import org.apache.lucene.document.StringField;
> > > import org.apache.lucene.document.TextField;
> > > import org.apache.lucene.index.DirectoryReader;
> > > import org.apache.lucene.index.IndexReader;
> > > import org.apache.lucene.index.IndexWriter;
> > > import org.apache.lucene.index.IndexWriterConfig;
> > > import org.apache.lucene.index.Term;
> > > import org.apache.lucene.queryparser.classic.MultiFieldQueryParser;
> > > import org.apache.lucene.search.BooleanClause;
> > > import org.apache.lucene.search.BooleanQuery;
> > > import org.apache.lucene.search.IndexSearcher;
> > > import org.apache.lucene.search.NumericRangeQuery;
> > > import org.apache.lucene.search.Query;
> > > import org.apache.lucene.search.ScoreDoc;
> > > import org.apache.lucene.search.TopDocs;
> > > import org.apache.lucene.util.BytesRef;
> > > import org.h2.util.JdbcUtils;
> > > import org.jetbrains.annotations.Nullable;
> > >
> > > import static
> > > org.apache.ignite.internal.processors.query.QueryUtils.KEY_FIELD_NAME;
> > > import static
> > > org.apache.ignite.internal.processors.query.QueryUtils.VAL_FIELD_NAME;
> > >
> > > /**
> > >  * Lucene fulltext index.
> > >  */
> > > public class GridLuceneIndex implements AutoCloseable {
> > >     /** Field name for string representation of value. */
> > >     public static final String VAL_STR_FIELD_NAME = "_gg_val_str__";
> > >
> > >     /** Field name for value version. */
> > >     public static final String VER_FIELD_NAME = "_gg_ver__";
> > >
> > >     /** Field name for value expiration time. */
> > >     public static final String EXPIRATION_TIME_FIELD_NAME =
> > > "_gg_expires__";
> > >
> > >     /** */
> > >     private final String cacheName;
> > >
> > >     /** */
> > >     private final GridQueryTypeDescriptor type;
> > >
> > >     /** */
> > >     private final IndexWriter writer;
> > >
> > >     /** */
> > >     private final String[] idxdFields;
> > >
> > >     /** */
> > >     private final AtomicLong updateCntr = new GridAtomicLong();
> > >
> > >     /** */
> > >     private final GridLuceneDirectory dir;
> > >
> > >     /** */
> > >     private final GridKernalContext ctx;
> > >
> > >     /**
> > >      * Constructor.
> > >      *
> > >      * @param ctx Kernal context.
> > >      * @param cacheName Cache name.
> > >      * @param type Type descriptor.
> > >      * @throws IgniteCheckedException If failed.
> > >      */
> > >     public GridLuceneIndex(GridKernalContext ctx, @Nullable String
> > > cacheName, GridQueryTypeDescriptor type)
> > >         throws IgniteCheckedException {
> > >         this.ctx = ctx;
> > >         this.cacheName = cacheName;
> > >         this.type = type;
> > >
> > >         dir = new GridLuceneDirectory(new GridUnsafeMemory(0));
> > >
> > >         try {
> > >             writer = new IndexWriter(dir, new IndexWriterConfig(new
> > > StandardAnalyzer()));
> > >         }
> > >         catch (IOException e) {
> > >             throw new IgniteCheckedException(e);
> > >         }
> > >
> > >         GridQueryIndexDescriptor idx = type.textIndex();
> > >
> > >         if (idx != null) {
> > >             Collection<String> fields = idx.fields();
> > >
> > >             idxdFields = new String[fields.size() + 1];
> > >
> > >             fields.toArray(idxdFields);
> > >         }
> > >         else {
> > >             assert type.valueTextIndex() || type.valueClass() ==
> > > String.class;
> > >
> > >             idxdFields = new String[1];
> > >         }
> > >
> > >         idxdFields[idxdFields.length - 1] = VAL_STR_FIELD_NAME;
> > >     }
> > >
> > >     /**
> > >      * @return Cache object context.
> > >      */
> > >     private CacheObjectContext objectContext() {
> > >         if (ctx == null)
> > >             return null;
> > >
> > >         return
> > > ctx.cache().internalCache(cacheName).context().cacheObjectContext();
> > >     }
> > >
> > >     /**
> > >      * Stores given data in this fulltext index.
> > >      *
> > >      * @param k Key.
> > >      * @param v Value.
> > >      * @param ver Version.
> > >      * @param expires Expiration time.
> > >      * @throws IgniteCheckedException If failed.
> > >      */
> > >     @SuppressWarnings("ConstantConditions")
> > >     public void store(CacheObject k, CacheObject v, GridCacheVersion
> ver,
> > > long expires) throws IgniteCheckedException {
> > >         CacheObjectContext coctx = objectContext();
> > >
> > >         Object key = k.isPlatformType() ? k.value(coctx, false) : k;
> > >         Object val = v.isPlatformType() ? v.value(coctx, false) : v;
> > >
> > >         Document doc = new Document();
> > >
> > >         boolean stringsFound = false;
> > >
> > >         if (type.valueTextIndex() || type.valueClass() ==
> String.class) {
> > >             doc.add(new TextField(VAL_STR_FIELD_NAME, val.toString(),
> > > Field.Store.YES));
> > >
> > >             stringsFound = true;
> > >         }
> > >
> > >         for (int i = 0, last = idxdFields.length - 1; i < last; i++) {
> > >             Object fieldVal = type.value(idxdFields[i], key, val);
> > >
> > >             if (fieldVal != null) {
> > >                 doc.add(new TextField(idxdFields[i],
> fieldVal.toString(),
> > > Field.Store.YES));
> > >
> > >                 stringsFound = true;
> > >             }
> > >         }
> > >
> > >         BytesRef keyByteRef = new BytesRef(k.valueBytes(coctx));
> > >
> > >         try {
> > >             final Term term = new Term(KEY_FIELD_NAME, keyByteRef);
> > >
> > >             if (!stringsFound) {
> > >                 writer.deleteDocuments(term);
> > >
> > >                 return; // We did not find any strings to be indexed,
> > will
> > > not store data at all.
> > >             }
> > >
> > >             doc.add(new StringField(KEY_FIELD_NAME, keyByteRef,
> > > Field.Store.YES));
> > >
> > >             if (type.valueClass() != String.class)
> > >                 doc.add(new StoredField(VAL_FIELD_NAME,
> > > v.valueBytes(coctx)));
> > >
> > >             doc.add(new StoredField(VER_FIELD_NAME,
> > > ver.toString().getBytes()));
> > >
> > >             doc.add(new LongField(EXPIRATION_TIME_FIELD_NAME, expires,
> > > Field.Store.YES));
> > >
> > >             // Next implies remove than add atomically operation.
> > >             writer.updateDocument(term, doc);
> > >         }
> > >         catch (IOException e) {
> > >             throw new IgniteCheckedException(e);
> > >         }
> > >         finally {
> > >             updateCntr.incrementAndGet();
> > >         }
> > >     }
> > >
> > >     /**
> > >      * Removes entry for given key from this index.
> > >      *
> > >      * @param key Key.
> > >      * @throws IgniteCheckedException If failed.
> > >      */
> > >     public void remove(CacheObject key) throws IgniteCheckedException {
> > >         try {
> > >             writer.deleteDocuments(new Term(KEY_FIELD_NAME,
> > >                 new BytesRef(key.valueBytes(objectContext()))));
> > >         }
> > >         catch (IOException e) {
> > >             throw new IgniteCheckedException(e);
> > >         }
> > >         finally {
> > >             updateCntr.incrementAndGet();
> > >         }
> > >     }
> > >
> > >     /**
> > >      * Runs lucene fulltext query over this index.
> > >      *
> > >      * @param qry Query.
> > >      * @param filters Filters over result.
> > >      * @param pageSize Size of batch
> > >      * @return Query result.
> > >      * @throws IgniteCheckedException If failed.
> > >      */
> > >     public <K, V> GridCloseableIterator<IgniteBiTuple<K, V>>
> query(String
> > > qry, IndexingQueryFilter filters, int pageSize) throws
> > > IgniteCheckedException {
> > >         IndexReader reader;
> > >
> > >         try {
> > >             long updates = updateCntr.get();
> > >
> > >             if (updates != 0) {
> > >                 writer.commit();
> > >
> > >                 updateCntr.addAndGet(-updates);
> > >             }
> > >
> > >             //We can cache reader\searcher and change this to
> > > 'openIfChanged'
> > >             reader = DirectoryReader.open(writer, true);
> > >         }
> > >         catch (IOException e) {
> > >             throw new IgniteCheckedException(e);
> > >         }
> > >
> > >         IndexSearcher searcher;
> > >
> > >         Query query;
> > >
> > >         try {
> > >             searcher = new IndexSearcher(reader);
> > >
> > >             MultiFieldQueryParser parser = new
> > > MultiFieldQueryParser(idxdFields,
> > >                 writer.getAnalyzer());
> > >
> > > //            parser.setAllowLeadingWildcard(true);
> > >
> > >             // Filter expired items.
> > >             Query filter =
> > > NumericRangeQuery.newLongRange(EXPIRATION_TIME_FIELD_NAME,
> > > U.currentTimeMillis(),
> > >                 null, false, false);
> > >
> > >             query = new BooleanQuery.Builder()
> > >                 .add(parser.parse(qry), BooleanClause.Occur.MUST)
> > >                 .add(filter, BooleanClause.Occur.FILTER)
> > >                 .build();
> > >         }
> > >         catch (Exception e) {
> > >             U.closeQuiet(reader);
> > >
> > >             throw new IgniteCheckedException(e);
> > >         }
> > >
> > >         IndexingQueryCacheFilter fltr = null;
> > >
> > >         if (filters != null)
> > >             fltr = filters.forCache(cacheName);
> > >
> > >         return new It<>(reader, searcher, query, fltr, pageSize);
> > >     }
> > >
> > >     /** {@inheritDoc} */
> > >     @Override public void close() {
> > >         U.closeQuiet(writer);
> > >         U.close(dir, ctx.log(GridLuceneIndex.class));
> > >     }
> > >
> > >     /**
> > >      * Key-value iterator over fulltext search result.
> > >      */
> > >     private class It<K, V> extends
> > > GridCloseableIteratorAdapter<IgniteBiTuple<K, V>> {
> > >         private final int BatchPosBeforeHead = -1;
> > >
> > >         /** */
> > >         private static final long serialVersionUID = 0L;
> > >
> > >         /** */
> > >         private final int pageSize;
> > >
> > >         /** */
> > >         private final IndexReader reader;
> > >
> > >         /** */
> > >         private final Query query;
> > >
> > >         /** */
> > >         private final IndexSearcher searcher;
> > >
> > >         /** current batch docs*/
> > >         private ScoreDoc[] batch;
> > >
> > >         /** current position in batch*/
> > >         private int batchPos = BatchPosBeforeHead;
> > >
> > >         /** */
> > >         private final IndexingQueryCacheFilter filters;
> > >
> > >         /** */
> > >         private IgniteBiTuple<K, V> curr;
> > >
> > >         /** */
> > >         private CacheObjectContext coctx;
> > >
> > >         /**
> > >          * Constructor.
> > >          *
> > >          * @param reader Reader.
> > >          * @param searcher Searcher.
> > >          * @param filters Filters over result.
> > >          * @throws IgniteCheckedException if failed.
> > >          */
> > >         private It(IndexReader reader, IndexSearcher searcher, Query
> > query,
> > > IndexingQueryCacheFilter filters, int pageSize)
> > >             throws IgniteCheckedException {
> > >             this.reader = reader;
> > >             this.searcher = searcher;
> > >             this.filters = filters;
> > >             this.query = query;
> > >             this.pageSize = pageSize;
> > >
> > >             coctx = objectContext();
> > >
> > >             findNext();
> > >         }
> > >
> > >         /**
> > >          * @param bytes Bytes.
> > >          * @param ldr Class loader.
> > >          * @return Object.
> > >          * @throws IgniteCheckedException If failed.
> > >          */
> > >         @SuppressWarnings("unchecked")
> > >         private <Z> Z unmarshall(byte[] bytes, ClassLoader ldr) throws
> > > IgniteCheckedException {
> > >             if (coctx == null) // For tests.
> > >                 return (Z)JdbcUtils.deserialize(bytes, null);
> > >
> > >             return
> > (Z)coctx.kernalContext().cacheObjects().unmarshal(coctx,
> > > bytes, ldr);
> > >         }
> > >
> > >         /**
> > >          * Finds next element.
> > >          *
> > >          * @throws IgniteCheckedException If failed.
> > >          */
> > >         @SuppressWarnings("unchecked")
> > >         private void findNext() throws IgniteCheckedException {
> > >             curr = null;
> > >
> > >             if(isClosed())
> > >                 throw new IgniteCheckedException("Iterator already
> > > closed");
> > >
> > >             if (shouldRequestNextBatch()) {
> > >                 try {
> > >                     requestNextBatch();
> > >                 } catch (IOException e) {
> > >                     close();
> > >                     throw new IgniteCheckedException(e);
> > >                 }
> > >             }
> > >
> > >             if(batch == null)
> > >                 return;
> > >
> > >             while (batchPos < batch.length) {
> > >                 Document doc;
> > >                 ScoreDoc scoreDoc =batch[batchPos++];
> > >
> > >                 try {
> > >                     doc = searcher.doc(scoreDoc.doc);
> > >                 }
> > >                 catch (IOException e) {
> > >                     throw new IgniteCheckedException(e);
> > >                 }
> > >
> > >                 ClassLoader ldr = null;
> > >
> > >                 if (ctx != null && ctx.deploy().enabled())
> > >                     ldr =
> > > ctx.cache().internalCache(cacheName).context().deploy().globalLoader();
> > >
> > >                 K k =
> > unmarshall(doc.getBinaryValue(KEY_FIELD_NAME).bytes,
> > > ldr);
> > >
> > >                 if (filters != null && !filters.apply(k))
> > >                     continue;
> > >
> > >                 V v = type.valueClass() == String.class ?
> > >                     (V)doc.get(VAL_STR_FIELD_NAME) :
> > >
> > > this.<V>unmarshall(doc.getBinaryValue(VAL_FIELD_NAME).bytes, ldr);
> > >
> > >                 assert v != null;
> > >
> > >                 curr = new IgniteBiTuple<>(k, v);
> > >
> > >                 break;
> > >             }
> > >         }
> > >
> > >         private boolean shouldRequestNextBatch()  {
> > >             if(batch == null){
> > >                 // should request for first batch
> > >                 return (batchPos == BatchPosBeforeHead) ;
> > >             } else {
> > >                 // should request when reached to the end of batch
> > >                 return (batchPos  == batch.length);
> > >             }
> > >         }
> > >
> > >         private void requestNextBatch() throws IOException {
> > >             TopDocs docs;
> > >
> > >             if (batch == null) {
> > >                 docs = searcher.search(query, pageSize);
> > >             } else {
> > >                 docs = searcher.searchAfter(batch[batch.length - 1],
> > query,
> > > pageSize);
> > >             }
> > >
> > >             if(docs.scoreDocs.length ==0) {
> > >                 batch = null;
> > >             }else{
> > >                 batch = docs.scoreDocs;
> > >             }
> > >
> > >             batchPos = 0;
> > >         }
> > >
> > >         /** {@inheritDoc} */
> > >         @Override protected IgniteBiTuple<K, V> onNext() throws
> > > IgniteCheckedException {
> > >             IgniteBiTuple<K, V> res = curr;
> > >
> > >             findNext();
> > >
> > >             return res;
> > >         }
> > >
> > >         /** {@inheritDoc} */
> > >         @Override protected boolean onHasNext() throws
> > > IgniteCheckedException {
> > >             return curr != null;
> > >         }
> > >
> > >         /** {@inheritDoc} */
> > >         @Override protected void onClose() throws
> IgniteCheckedException
> > {
> > >             U.closeQuiet(reader);
> > >         }
> > >     }
> > > }
> > > ```
> > >
> > > On Fri, Sep 7, 2018 at 9:02 AM Tâm Nguyễn Mạnh <
> > nguyenmanhtam123@gmail.com
> > > >
> > > wrote:
> > >
> > > > Hi,
> > > >
> > > > I tried to implement iterator for GridLuceneInde, could you please
> help
> > > to
> > > > review ?
> > > >
> > > > --
> > > > Thanks & Best Regards
> > > >
> > > > Tam, Nguyen Manh
> > > >
> > > >
> > >
> > > --
> > > Thanks & Best Regards
> > >
> > > Tam, Nguyen Manh
> > >
> >
>
>
> --
> Thanks & Best Regards
>
> Tam, Nguyen Manh
>

Re: IGNITE-7482 Cursor in TextQuery fetches all data in first call to next() or hasNext()

Posted by Tâm Nguyễn Mạnh <ng...@gmail.com>.
Hi,
I have not been assigned yet. But i really want to.

On Fri, Sep 7, 2018 at 4:13 PM Ilya Kasnacheev <il...@gmail.com>
wrote:

> Hello!
>
> Can you please frame it as Github pull request as per our process? Do you
> have ticket for that?
>
> Regards,
> --
> Ilya Kasnacheev
>
>
> пт, 7 сент. 2018 г. в 5:08, Tâm Nguyễn Mạnh <ng...@gmail.com>:
>
> >
> >
> modules\indexing\src\main\java\org\apache\ignite\internal\processors\query\h2\opt\GridLuceneIndex.java
> > ```java
> > /*
> >  * Licensed to the Apache Software Foundation (ASF) under one or more
> >  * contributor license agreements.  See the NOTICE file distributed with
> >  * this work for additional information regarding copyright ownership.
> >  * The ASF licenses this file to You under the Apache License, Version
> 2.0
> >  * (the "License"); you may not use this file except in compliance with
> >  * the License.  You may obtain a copy of the License at
> >  *
> >  *      http://www.apache.org/licenses/LICENSE-2.0
> >  *
> >  * Unless required by applicable law or agreed to in writing, software
> >  * distributed under the License is distributed on an "AS IS" BASIS,
> >  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> implied.
> >  * See the License for the specific language governing permissions and
> >  * limitations under the License.
> >  */
> >
> > package org.apache.ignite.internal.processors.query.h2.opt;
> >
> > import java.io.IOException;
> > import java.util.Collection;
> > import java.util.concurrent.atomic.AtomicLong;
> > import org.apache.ignite.IgniteCheckedException;
> > import org.apache.ignite.internal.GridKernalContext;
> > import org.apache.ignite.internal.processors.cache.CacheObject;
> > import org.apache.ignite.internal.processors.cache.CacheObjectContext;
> > import
> > org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
> > import
> > org.apache.ignite.internal.processors.query.GridQueryIndexDescriptor;
> > import
> org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
> > import org.apache.ignite.internal.util.GridAtomicLong;
> > import org.apache.ignite.internal.util.GridCloseableIteratorAdapter;
> > import org.apache.ignite.internal.util.lang.GridCloseableIterator;
> > import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
> > import org.apache.ignite.internal.util.typedef.internal.U;
> > import org.apache.ignite.lang.IgniteBiTuple;
> > import org.apache.ignite.spi.indexing.IndexingQueryFilter;
> > import org.apache.ignite.spi.indexing.IndexingQueryCacheFilter;
> > import org.apache.lucene.analysis.standard.StandardAnalyzer;
> > import org.apache.lucene.document.Document;
> > import org.apache.lucene.document.Field;
> > import org.apache.lucene.document.LongField;
> > import org.apache.lucene.document.StoredField;
> > import org.apache.lucene.document.StringField;
> > import org.apache.lucene.document.TextField;
> > import org.apache.lucene.index.DirectoryReader;
> > import org.apache.lucene.index.IndexReader;
> > import org.apache.lucene.index.IndexWriter;
> > import org.apache.lucene.index.IndexWriterConfig;
> > import org.apache.lucene.index.Term;
> > import org.apache.lucene.queryparser.classic.MultiFieldQueryParser;
> > import org.apache.lucene.search.BooleanClause;
> > import org.apache.lucene.search.BooleanQuery;
> > import org.apache.lucene.search.IndexSearcher;
> > import org.apache.lucene.search.NumericRangeQuery;
> > import org.apache.lucene.search.Query;
> > import org.apache.lucene.search.ScoreDoc;
> > import org.apache.lucene.search.TopDocs;
> > import org.apache.lucene.util.BytesRef;
> > import org.h2.util.JdbcUtils;
> > import org.jetbrains.annotations.Nullable;
> >
> > import static
> > org.apache.ignite.internal.processors.query.QueryUtils.KEY_FIELD_NAME;
> > import static
> > org.apache.ignite.internal.processors.query.QueryUtils.VAL_FIELD_NAME;
> >
> > /**
> >  * Lucene fulltext index.
> >  */
> > public class GridLuceneIndex implements AutoCloseable {
> >     /** Field name for string representation of value. */
> >     public static final String VAL_STR_FIELD_NAME = "_gg_val_str__";
> >
> >     /** Field name for value version. */
> >     public static final String VER_FIELD_NAME = "_gg_ver__";
> >
> >     /** Field name for value expiration time. */
> >     public static final String EXPIRATION_TIME_FIELD_NAME =
> > "_gg_expires__";
> >
> >     /** */
> >     private final String cacheName;
> >
> >     /** */
> >     private final GridQueryTypeDescriptor type;
> >
> >     /** */
> >     private final IndexWriter writer;
> >
> >     /** */
> >     private final String[] idxdFields;
> >
> >     /** */
> >     private final AtomicLong updateCntr = new GridAtomicLong();
> >
> >     /** */
> >     private final GridLuceneDirectory dir;
> >
> >     /** */
> >     private final GridKernalContext ctx;
> >
> >     /**
> >      * Constructor.
> >      *
> >      * @param ctx Kernal context.
> >      * @param cacheName Cache name.
> >      * @param type Type descriptor.
> >      * @throws IgniteCheckedException If failed.
> >      */
> >     public GridLuceneIndex(GridKernalContext ctx, @Nullable String
> > cacheName, GridQueryTypeDescriptor type)
> >         throws IgniteCheckedException {
> >         this.ctx = ctx;
> >         this.cacheName = cacheName;
> >         this.type = type;
> >
> >         dir = new GridLuceneDirectory(new GridUnsafeMemory(0));
> >
> >         try {
> >             writer = new IndexWriter(dir, new IndexWriterConfig(new
> > StandardAnalyzer()));
> >         }
> >         catch (IOException e) {
> >             throw new IgniteCheckedException(e);
> >         }
> >
> >         GridQueryIndexDescriptor idx = type.textIndex();
> >
> >         if (idx != null) {
> >             Collection<String> fields = idx.fields();
> >
> >             idxdFields = new String[fields.size() + 1];
> >
> >             fields.toArray(idxdFields);
> >         }
> >         else {
> >             assert type.valueTextIndex() || type.valueClass() ==
> > String.class;
> >
> >             idxdFields = new String[1];
> >         }
> >
> >         idxdFields[idxdFields.length - 1] = VAL_STR_FIELD_NAME;
> >     }
> >
> >     /**
> >      * @return Cache object context.
> >      */
> >     private CacheObjectContext objectContext() {
> >         if (ctx == null)
> >             return null;
> >
> >         return
> > ctx.cache().internalCache(cacheName).context().cacheObjectContext();
> >     }
> >
> >     /**
> >      * Stores given data in this fulltext index.
> >      *
> >      * @param k Key.
> >      * @param v Value.
> >      * @param ver Version.
> >      * @param expires Expiration time.
> >      * @throws IgniteCheckedException If failed.
> >      */
> >     @SuppressWarnings("ConstantConditions")
> >     public void store(CacheObject k, CacheObject v, GridCacheVersion ver,
> > long expires) throws IgniteCheckedException {
> >         CacheObjectContext coctx = objectContext();
> >
> >         Object key = k.isPlatformType() ? k.value(coctx, false) : k;
> >         Object val = v.isPlatformType() ? v.value(coctx, false) : v;
> >
> >         Document doc = new Document();
> >
> >         boolean stringsFound = false;
> >
> >         if (type.valueTextIndex() || type.valueClass() == String.class) {
> >             doc.add(new TextField(VAL_STR_FIELD_NAME, val.toString(),
> > Field.Store.YES));
> >
> >             stringsFound = true;
> >         }
> >
> >         for (int i = 0, last = idxdFields.length - 1; i < last; i++) {
> >             Object fieldVal = type.value(idxdFields[i], key, val);
> >
> >             if (fieldVal != null) {
> >                 doc.add(new TextField(idxdFields[i], fieldVal.toString(),
> > Field.Store.YES));
> >
> >                 stringsFound = true;
> >             }
> >         }
> >
> >         BytesRef keyByteRef = new BytesRef(k.valueBytes(coctx));
> >
> >         try {
> >             final Term term = new Term(KEY_FIELD_NAME, keyByteRef);
> >
> >             if (!stringsFound) {
> >                 writer.deleteDocuments(term);
> >
> >                 return; // We did not find any strings to be indexed,
> will
> > not store data at all.
> >             }
> >
> >             doc.add(new StringField(KEY_FIELD_NAME, keyByteRef,
> > Field.Store.YES));
> >
> >             if (type.valueClass() != String.class)
> >                 doc.add(new StoredField(VAL_FIELD_NAME,
> > v.valueBytes(coctx)));
> >
> >             doc.add(new StoredField(VER_FIELD_NAME,
> > ver.toString().getBytes()));
> >
> >             doc.add(new LongField(EXPIRATION_TIME_FIELD_NAME, expires,
> > Field.Store.YES));
> >
> >             // Next implies remove than add atomically operation.
> >             writer.updateDocument(term, doc);
> >         }
> >         catch (IOException e) {
> >             throw new IgniteCheckedException(e);
> >         }
> >         finally {
> >             updateCntr.incrementAndGet();
> >         }
> >     }
> >
> >     /**
> >      * Removes entry for given key from this index.
> >      *
> >      * @param key Key.
> >      * @throws IgniteCheckedException If failed.
> >      */
> >     public void remove(CacheObject key) throws IgniteCheckedException {
> >         try {
> >             writer.deleteDocuments(new Term(KEY_FIELD_NAME,
> >                 new BytesRef(key.valueBytes(objectContext()))));
> >         }
> >         catch (IOException e) {
> >             throw new IgniteCheckedException(e);
> >         }
> >         finally {
> >             updateCntr.incrementAndGet();
> >         }
> >     }
> >
> >     /**
> >      * Runs lucene fulltext query over this index.
> >      *
> >      * @param qry Query.
> >      * @param filters Filters over result.
> >      * @param pageSize Size of batch
> >      * @return Query result.
> >      * @throws IgniteCheckedException If failed.
> >      */
> >     public <K, V> GridCloseableIterator<IgniteBiTuple<K, V>> query(String
> > qry, IndexingQueryFilter filters, int pageSize) throws
> > IgniteCheckedException {
> >         IndexReader reader;
> >
> >         try {
> >             long updates = updateCntr.get();
> >
> >             if (updates != 0) {
> >                 writer.commit();
> >
> >                 updateCntr.addAndGet(-updates);
> >             }
> >
> >             //We can cache reader\searcher and change this to
> > 'openIfChanged'
> >             reader = DirectoryReader.open(writer, true);
> >         }
> >         catch (IOException e) {
> >             throw new IgniteCheckedException(e);
> >         }
> >
> >         IndexSearcher searcher;
> >
> >         Query query;
> >
> >         try {
> >             searcher = new IndexSearcher(reader);
> >
> >             MultiFieldQueryParser parser = new
> > MultiFieldQueryParser(idxdFields,
> >                 writer.getAnalyzer());
> >
> > //            parser.setAllowLeadingWildcard(true);
> >
> >             // Filter expired items.
> >             Query filter =
> > NumericRangeQuery.newLongRange(EXPIRATION_TIME_FIELD_NAME,
> > U.currentTimeMillis(),
> >                 null, false, false);
> >
> >             query = new BooleanQuery.Builder()
> >                 .add(parser.parse(qry), BooleanClause.Occur.MUST)
> >                 .add(filter, BooleanClause.Occur.FILTER)
> >                 .build();
> >         }
> >         catch (Exception e) {
> >             U.closeQuiet(reader);
> >
> >             throw new IgniteCheckedException(e);
> >         }
> >
> >         IndexingQueryCacheFilter fltr = null;
> >
> >         if (filters != null)
> >             fltr = filters.forCache(cacheName);
> >
> >         return new It<>(reader, searcher, query, fltr, pageSize);
> >     }
> >
> >     /** {@inheritDoc} */
> >     @Override public void close() {
> >         U.closeQuiet(writer);
> >         U.close(dir, ctx.log(GridLuceneIndex.class));
> >     }
> >
> >     /**
> >      * Key-value iterator over fulltext search result.
> >      */
> >     private class It<K, V> extends
> > GridCloseableIteratorAdapter<IgniteBiTuple<K, V>> {
> >         private final int BatchPosBeforeHead = -1;
> >
> >         /** */
> >         private static final long serialVersionUID = 0L;
> >
> >         /** */
> >         private final int pageSize;
> >
> >         /** */
> >         private final IndexReader reader;
> >
> >         /** */
> >         private final Query query;
> >
> >         /** */
> >         private final IndexSearcher searcher;
> >
> >         /** current batch docs*/
> >         private ScoreDoc[] batch;
> >
> >         /** current position in batch*/
> >         private int batchPos = BatchPosBeforeHead;
> >
> >         /** */
> >         private final IndexingQueryCacheFilter filters;
> >
> >         /** */
> >         private IgniteBiTuple<K, V> curr;
> >
> >         /** */
> >         private CacheObjectContext coctx;
> >
> >         /**
> >          * Constructor.
> >          *
> >          * @param reader Reader.
> >          * @param searcher Searcher.
> >          * @param filters Filters over result.
> >          * @throws IgniteCheckedException if failed.
> >          */
> >         private It(IndexReader reader, IndexSearcher searcher, Query
> query,
> > IndexingQueryCacheFilter filters, int pageSize)
> >             throws IgniteCheckedException {
> >             this.reader = reader;
> >             this.searcher = searcher;
> >             this.filters = filters;
> >             this.query = query;
> >             this.pageSize = pageSize;
> >
> >             coctx = objectContext();
> >
> >             findNext();
> >         }
> >
> >         /**
> >          * @param bytes Bytes.
> >          * @param ldr Class loader.
> >          * @return Object.
> >          * @throws IgniteCheckedException If failed.
> >          */
> >         @SuppressWarnings("unchecked")
> >         private <Z> Z unmarshall(byte[] bytes, ClassLoader ldr) throws
> > IgniteCheckedException {
> >             if (coctx == null) // For tests.
> >                 return (Z)JdbcUtils.deserialize(bytes, null);
> >
> >             return
> (Z)coctx.kernalContext().cacheObjects().unmarshal(coctx,
> > bytes, ldr);
> >         }
> >
> >         /**
> >          * Finds next element.
> >          *
> >          * @throws IgniteCheckedException If failed.
> >          */
> >         @SuppressWarnings("unchecked")
> >         private void findNext() throws IgniteCheckedException {
> >             curr = null;
> >
> >             if(isClosed())
> >                 throw new IgniteCheckedException("Iterator already
> > closed");
> >
> >             if (shouldRequestNextBatch()) {
> >                 try {
> >                     requestNextBatch();
> >                 } catch (IOException e) {
> >                     close();
> >                     throw new IgniteCheckedException(e);
> >                 }
> >             }
> >
> >             if(batch == null)
> >                 return;
> >
> >             while (batchPos < batch.length) {
> >                 Document doc;
> >                 ScoreDoc scoreDoc =batch[batchPos++];
> >
> >                 try {
> >                     doc = searcher.doc(scoreDoc.doc);
> >                 }
> >                 catch (IOException e) {
> >                     throw new IgniteCheckedException(e);
> >                 }
> >
> >                 ClassLoader ldr = null;
> >
> >                 if (ctx != null && ctx.deploy().enabled())
> >                     ldr =
> > ctx.cache().internalCache(cacheName).context().deploy().globalLoader();
> >
> >                 K k =
> unmarshall(doc.getBinaryValue(KEY_FIELD_NAME).bytes,
> > ldr);
> >
> >                 if (filters != null && !filters.apply(k))
> >                     continue;
> >
> >                 V v = type.valueClass() == String.class ?
> >                     (V)doc.get(VAL_STR_FIELD_NAME) :
> >
> > this.<V>unmarshall(doc.getBinaryValue(VAL_FIELD_NAME).bytes, ldr);
> >
> >                 assert v != null;
> >
> >                 curr = new IgniteBiTuple<>(k, v);
> >
> >                 break;
> >             }
> >         }
> >
> >         private boolean shouldRequestNextBatch()  {
> >             if(batch == null){
> >                 // should request for first batch
> >                 return (batchPos == BatchPosBeforeHead) ;
> >             } else {
> >                 // should request when reached to the end of batch
> >                 return (batchPos  == batch.length);
> >             }
> >         }
> >
> >         private void requestNextBatch() throws IOException {
> >             TopDocs docs;
> >
> >             if (batch == null) {
> >                 docs = searcher.search(query, pageSize);
> >             } else {
> >                 docs = searcher.searchAfter(batch[batch.length - 1],
> query,
> > pageSize);
> >             }
> >
> >             if(docs.scoreDocs.length ==0) {
> >                 batch = null;
> >             }else{
> >                 batch = docs.scoreDocs;
> >             }
> >
> >             batchPos = 0;
> >         }
> >
> >         /** {@inheritDoc} */
> >         @Override protected IgniteBiTuple<K, V> onNext() throws
> > IgniteCheckedException {
> >             IgniteBiTuple<K, V> res = curr;
> >
> >             findNext();
> >
> >             return res;
> >         }
> >
> >         /** {@inheritDoc} */
> >         @Override protected boolean onHasNext() throws
> > IgniteCheckedException {
> >             return curr != null;
> >         }
> >
> >         /** {@inheritDoc} */
> >         @Override protected void onClose() throws IgniteCheckedException
> {
> >             U.closeQuiet(reader);
> >         }
> >     }
> > }
> > ```
> >
> > On Fri, Sep 7, 2018 at 9:02 AM Tâm Nguyễn Mạnh <
> nguyenmanhtam123@gmail.com
> > >
> > wrote:
> >
> > > Hi,
> > >
> > > I tried to implement iterator for GridLuceneInde, could you please help
> > to
> > > review ?
> > >
> > > --
> > > Thanks & Best Regards
> > >
> > > Tam, Nguyen Manh
> > >
> > >
> >
> > --
> > Thanks & Best Regards
> >
> > Tam, Nguyen Manh
> >
>


-- 
Thanks & Best Regards

Tam, Nguyen Manh

Re: IGNITE-7482 Cursor in TextQuery fetches all data in first call to next() or hasNext()

Posted by Ilya Kasnacheev <il...@gmail.com>.
Hello!

Can you please frame it as Github pull request as per our process? Do you
have ticket for that?

Regards,
-- 
Ilya Kasnacheev


пт, 7 сент. 2018 г. в 5:08, Tâm Nguyễn Mạnh <ng...@gmail.com>:

>
> modules\indexing\src\main\java\org\apache\ignite\internal\processors\query\h2\opt\GridLuceneIndex.java
> ```java
> /*
>  * Licensed to the Apache Software Foundation (ASF) under one or more
>  * contributor license agreements.  See the NOTICE file distributed with
>  * this work for additional information regarding copyright ownership.
>  * The ASF licenses this file to You under the Apache License, Version 2.0
>  * (the "License"); you may not use this file except in compliance with
>  * the License.  You may obtain a copy of the License at
>  *
>  *      http://www.apache.org/licenses/LICENSE-2.0
>  *
>  * Unless required by applicable law or agreed to in writing, software
>  * distributed under the License is distributed on an "AS IS" BASIS,
>  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>  * See the License for the specific language governing permissions and
>  * limitations under the License.
>  */
>
> package org.apache.ignite.internal.processors.query.h2.opt;
>
> import java.io.IOException;
> import java.util.Collection;
> import java.util.concurrent.atomic.AtomicLong;
> import org.apache.ignite.IgniteCheckedException;
> import org.apache.ignite.internal.GridKernalContext;
> import org.apache.ignite.internal.processors.cache.CacheObject;
> import org.apache.ignite.internal.processors.cache.CacheObjectContext;
> import
> org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
> import
> org.apache.ignite.internal.processors.query.GridQueryIndexDescriptor;
> import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
> import org.apache.ignite.internal.util.GridAtomicLong;
> import org.apache.ignite.internal.util.GridCloseableIteratorAdapter;
> import org.apache.ignite.internal.util.lang.GridCloseableIterator;
> import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
> import org.apache.ignite.internal.util.typedef.internal.U;
> import org.apache.ignite.lang.IgniteBiTuple;
> import org.apache.ignite.spi.indexing.IndexingQueryFilter;
> import org.apache.ignite.spi.indexing.IndexingQueryCacheFilter;
> import org.apache.lucene.analysis.standard.StandardAnalyzer;
> import org.apache.lucene.document.Document;
> import org.apache.lucene.document.Field;
> import org.apache.lucene.document.LongField;
> import org.apache.lucene.document.StoredField;
> import org.apache.lucene.document.StringField;
> import org.apache.lucene.document.TextField;
> import org.apache.lucene.index.DirectoryReader;
> import org.apache.lucene.index.IndexReader;
> import org.apache.lucene.index.IndexWriter;
> import org.apache.lucene.index.IndexWriterConfig;
> import org.apache.lucene.index.Term;
> import org.apache.lucene.queryparser.classic.MultiFieldQueryParser;
> import org.apache.lucene.search.BooleanClause;
> import org.apache.lucene.search.BooleanQuery;
> import org.apache.lucene.search.IndexSearcher;
> import org.apache.lucene.search.NumericRangeQuery;
> import org.apache.lucene.search.Query;
> import org.apache.lucene.search.ScoreDoc;
> import org.apache.lucene.search.TopDocs;
> import org.apache.lucene.util.BytesRef;
> import org.h2.util.JdbcUtils;
> import org.jetbrains.annotations.Nullable;
>
> import static
> org.apache.ignite.internal.processors.query.QueryUtils.KEY_FIELD_NAME;
> import static
> org.apache.ignite.internal.processors.query.QueryUtils.VAL_FIELD_NAME;
>
> /**
>  * Lucene fulltext index.
>  */
> public class GridLuceneIndex implements AutoCloseable {
>     /** Field name for string representation of value. */
>     public static final String VAL_STR_FIELD_NAME = "_gg_val_str__";
>
>     /** Field name for value version. */
>     public static final String VER_FIELD_NAME = "_gg_ver__";
>
>     /** Field name for value expiration time. */
>     public static final String EXPIRATION_TIME_FIELD_NAME =
> "_gg_expires__";
>
>     /** */
>     private final String cacheName;
>
>     /** */
>     private final GridQueryTypeDescriptor type;
>
>     /** */
>     private final IndexWriter writer;
>
>     /** */
>     private final String[] idxdFields;
>
>     /** */
>     private final AtomicLong updateCntr = new GridAtomicLong();
>
>     /** */
>     private final GridLuceneDirectory dir;
>
>     /** */
>     private final GridKernalContext ctx;
>
>     /**
>      * Constructor.
>      *
>      * @param ctx Kernal context.
>      * @param cacheName Cache name.
>      * @param type Type descriptor.
>      * @throws IgniteCheckedException If failed.
>      */
>     public GridLuceneIndex(GridKernalContext ctx, @Nullable String
> cacheName, GridQueryTypeDescriptor type)
>         throws IgniteCheckedException {
>         this.ctx = ctx;
>         this.cacheName = cacheName;
>         this.type = type;
>
>         dir = new GridLuceneDirectory(new GridUnsafeMemory(0));
>
>         try {
>             writer = new IndexWriter(dir, new IndexWriterConfig(new
> StandardAnalyzer()));
>         }
>         catch (IOException e) {
>             throw new IgniteCheckedException(e);
>         }
>
>         GridQueryIndexDescriptor idx = type.textIndex();
>
>         if (idx != null) {
>             Collection<String> fields = idx.fields();
>
>             idxdFields = new String[fields.size() + 1];
>
>             fields.toArray(idxdFields);
>         }
>         else {
>             assert type.valueTextIndex() || type.valueClass() ==
> String.class;
>
>             idxdFields = new String[1];
>         }
>
>         idxdFields[idxdFields.length - 1] = VAL_STR_FIELD_NAME;
>     }
>
>     /**
>      * @return Cache object context.
>      */
>     private CacheObjectContext objectContext() {
>         if (ctx == null)
>             return null;
>
>         return
> ctx.cache().internalCache(cacheName).context().cacheObjectContext();
>     }
>
>     /**
>      * Stores given data in this fulltext index.
>      *
>      * @param k Key.
>      * @param v Value.
>      * @param ver Version.
>      * @param expires Expiration time.
>      * @throws IgniteCheckedException If failed.
>      */
>     @SuppressWarnings("ConstantConditions")
>     public void store(CacheObject k, CacheObject v, GridCacheVersion ver,
> long expires) throws IgniteCheckedException {
>         CacheObjectContext coctx = objectContext();
>
>         Object key = k.isPlatformType() ? k.value(coctx, false) : k;
>         Object val = v.isPlatformType() ? v.value(coctx, false) : v;
>
>         Document doc = new Document();
>
>         boolean stringsFound = false;
>
>         if (type.valueTextIndex() || type.valueClass() == String.class) {
>             doc.add(new TextField(VAL_STR_FIELD_NAME, val.toString(),
> Field.Store.YES));
>
>             stringsFound = true;
>         }
>
>         for (int i = 0, last = idxdFields.length - 1; i < last; i++) {
>             Object fieldVal = type.value(idxdFields[i], key, val);
>
>             if (fieldVal != null) {
>                 doc.add(new TextField(idxdFields[i], fieldVal.toString(),
> Field.Store.YES));
>
>                 stringsFound = true;
>             }
>         }
>
>         BytesRef keyByteRef = new BytesRef(k.valueBytes(coctx));
>
>         try {
>             final Term term = new Term(KEY_FIELD_NAME, keyByteRef);
>
>             if (!stringsFound) {
>                 writer.deleteDocuments(term);
>
>                 return; // We did not find any strings to be indexed, will
> not store data at all.
>             }
>
>             doc.add(new StringField(KEY_FIELD_NAME, keyByteRef,
> Field.Store.YES));
>
>             if (type.valueClass() != String.class)
>                 doc.add(new StoredField(VAL_FIELD_NAME,
> v.valueBytes(coctx)));
>
>             doc.add(new StoredField(VER_FIELD_NAME,
> ver.toString().getBytes()));
>
>             doc.add(new LongField(EXPIRATION_TIME_FIELD_NAME, expires,
> Field.Store.YES));
>
>             // Next implies remove than add atomically operation.
>             writer.updateDocument(term, doc);
>         }
>         catch (IOException e) {
>             throw new IgniteCheckedException(e);
>         }
>         finally {
>             updateCntr.incrementAndGet();
>         }
>     }
>
>     /**
>      * Removes entry for given key from this index.
>      *
>      * @param key Key.
>      * @throws IgniteCheckedException If failed.
>      */
>     public void remove(CacheObject key) throws IgniteCheckedException {
>         try {
>             writer.deleteDocuments(new Term(KEY_FIELD_NAME,
>                 new BytesRef(key.valueBytes(objectContext()))));
>         }
>         catch (IOException e) {
>             throw new IgniteCheckedException(e);
>         }
>         finally {
>             updateCntr.incrementAndGet();
>         }
>     }
>
>     /**
>      * Runs lucene fulltext query over this index.
>      *
>      * @param qry Query.
>      * @param filters Filters over result.
>      * @param pageSize Size of batch
>      * @return Query result.
>      * @throws IgniteCheckedException If failed.
>      */
>     public <K, V> GridCloseableIterator<IgniteBiTuple<K, V>> query(String
> qry, IndexingQueryFilter filters, int pageSize) throws
> IgniteCheckedException {
>         IndexReader reader;
>
>         try {
>             long updates = updateCntr.get();
>
>             if (updates != 0) {
>                 writer.commit();
>
>                 updateCntr.addAndGet(-updates);
>             }
>
>             //We can cache reader\searcher and change this to
> 'openIfChanged'
>             reader = DirectoryReader.open(writer, true);
>         }
>         catch (IOException e) {
>             throw new IgniteCheckedException(e);
>         }
>
>         IndexSearcher searcher;
>
>         Query query;
>
>         try {
>             searcher = new IndexSearcher(reader);
>
>             MultiFieldQueryParser parser = new
> MultiFieldQueryParser(idxdFields,
>                 writer.getAnalyzer());
>
> //            parser.setAllowLeadingWildcard(true);
>
>             // Filter expired items.
>             Query filter =
> NumericRangeQuery.newLongRange(EXPIRATION_TIME_FIELD_NAME,
> U.currentTimeMillis(),
>                 null, false, false);
>
>             query = new BooleanQuery.Builder()
>                 .add(parser.parse(qry), BooleanClause.Occur.MUST)
>                 .add(filter, BooleanClause.Occur.FILTER)
>                 .build();
>         }
>         catch (Exception e) {
>             U.closeQuiet(reader);
>
>             throw new IgniteCheckedException(e);
>         }
>
>         IndexingQueryCacheFilter fltr = null;
>
>         if (filters != null)
>             fltr = filters.forCache(cacheName);
>
>         return new It<>(reader, searcher, query, fltr, pageSize);
>     }
>
>     /** {@inheritDoc} */
>     @Override public void close() {
>         U.closeQuiet(writer);
>         U.close(dir, ctx.log(GridLuceneIndex.class));
>     }
>
>     /**
>      * Key-value iterator over fulltext search result.
>      */
>     private class It<K, V> extends
> GridCloseableIteratorAdapter<IgniteBiTuple<K, V>> {
>         private final int BatchPosBeforeHead = -1;
>
>         /** */
>         private static final long serialVersionUID = 0L;
>
>         /** */
>         private final int pageSize;
>
>         /** */
>         private final IndexReader reader;
>
>         /** */
>         private final Query query;
>
>         /** */
>         private final IndexSearcher searcher;
>
>         /** current batch docs*/
>         private ScoreDoc[] batch;
>
>         /** current position in batch*/
>         private int batchPos = BatchPosBeforeHead;
>
>         /** */
>         private final IndexingQueryCacheFilter filters;
>
>         /** */
>         private IgniteBiTuple<K, V> curr;
>
>         /** */
>         private CacheObjectContext coctx;
>
>         /**
>          * Constructor.
>          *
>          * @param reader Reader.
>          * @param searcher Searcher.
>          * @param filters Filters over result.
>          * @throws IgniteCheckedException if failed.
>          */
>         private It(IndexReader reader, IndexSearcher searcher, Query query,
> IndexingQueryCacheFilter filters, int pageSize)
>             throws IgniteCheckedException {
>             this.reader = reader;
>             this.searcher = searcher;
>             this.filters = filters;
>             this.query = query;
>             this.pageSize = pageSize;
>
>             coctx = objectContext();
>
>             findNext();
>         }
>
>         /**
>          * @param bytes Bytes.
>          * @param ldr Class loader.
>          * @return Object.
>          * @throws IgniteCheckedException If failed.
>          */
>         @SuppressWarnings("unchecked")
>         private <Z> Z unmarshall(byte[] bytes, ClassLoader ldr) throws
> IgniteCheckedException {
>             if (coctx == null) // For tests.
>                 return (Z)JdbcUtils.deserialize(bytes, null);
>
>             return (Z)coctx.kernalContext().cacheObjects().unmarshal(coctx,
> bytes, ldr);
>         }
>
>         /**
>          * Finds next element.
>          *
>          * @throws IgniteCheckedException If failed.
>          */
>         @SuppressWarnings("unchecked")
>         private void findNext() throws IgniteCheckedException {
>             curr = null;
>
>             if(isClosed())
>                 throw new IgniteCheckedException("Iterator already
> closed");
>
>             if (shouldRequestNextBatch()) {
>                 try {
>                     requestNextBatch();
>                 } catch (IOException e) {
>                     close();
>                     throw new IgniteCheckedException(e);
>                 }
>             }
>
>             if(batch == null)
>                 return;
>
>             while (batchPos < batch.length) {
>                 Document doc;
>                 ScoreDoc scoreDoc =batch[batchPos++];
>
>                 try {
>                     doc = searcher.doc(scoreDoc.doc);
>                 }
>                 catch (IOException e) {
>                     throw new IgniteCheckedException(e);
>                 }
>
>                 ClassLoader ldr = null;
>
>                 if (ctx != null && ctx.deploy().enabled())
>                     ldr =
> ctx.cache().internalCache(cacheName).context().deploy().globalLoader();
>
>                 K k = unmarshall(doc.getBinaryValue(KEY_FIELD_NAME).bytes,
> ldr);
>
>                 if (filters != null && !filters.apply(k))
>                     continue;
>
>                 V v = type.valueClass() == String.class ?
>                     (V)doc.get(VAL_STR_FIELD_NAME) :
>
> this.<V>unmarshall(doc.getBinaryValue(VAL_FIELD_NAME).bytes, ldr);
>
>                 assert v != null;
>
>                 curr = new IgniteBiTuple<>(k, v);
>
>                 break;
>             }
>         }
>
>         private boolean shouldRequestNextBatch()  {
>             if(batch == null){
>                 // should request for first batch
>                 return (batchPos == BatchPosBeforeHead) ;
>             } else {
>                 // should request when reached to the end of batch
>                 return (batchPos  == batch.length);
>             }
>         }
>
>         private void requestNextBatch() throws IOException {
>             TopDocs docs;
>
>             if (batch == null) {
>                 docs = searcher.search(query, pageSize);
>             } else {
>                 docs = searcher.searchAfter(batch[batch.length - 1], query,
> pageSize);
>             }
>
>             if(docs.scoreDocs.length ==0) {
>                 batch = null;
>             }else{
>                 batch = docs.scoreDocs;
>             }
>
>             batchPos = 0;
>         }
>
>         /** {@inheritDoc} */
>         @Override protected IgniteBiTuple<K, V> onNext() throws
> IgniteCheckedException {
>             IgniteBiTuple<K, V> res = curr;
>
>             findNext();
>
>             return res;
>         }
>
>         /** {@inheritDoc} */
>         @Override protected boolean onHasNext() throws
> IgniteCheckedException {
>             return curr != null;
>         }
>
>         /** {@inheritDoc} */
>         @Override protected void onClose() throws IgniteCheckedException {
>             U.closeQuiet(reader);
>         }
>     }
> }
> ```
>
> On Fri, Sep 7, 2018 at 9:02 AM Tâm Nguyễn Mạnh <nguyenmanhtam123@gmail.com
> >
> wrote:
>
> > Hi,
> >
> > I tried to implement iterator for GridLuceneInde, could you please help
> to
> > review ?
> >
> > --
> > Thanks & Best Regards
> >
> > Tam, Nguyen Manh
> >
> >
>
> --
> Thanks & Best Regards
>
> Tam, Nguyen Manh
>

Re: IGNITE-7482 Cursor in TextQuery fetches all data in first call to next() or hasNext()

Posted by Tâm Nguyễn Mạnh <ng...@gmail.com>.
modules\indexing\src\main\java\org\apache\ignite\internal\processors\query\h2\opt\GridLuceneIndex.java
```java
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.ignite.internal.processors.query.h2.opt;

import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.CacheObjectContext;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.query.GridQueryIndexDescriptor;
import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
import org.apache.ignite.internal.util.GridAtomicLong;
import org.apache.ignite.internal.util.GridCloseableIteratorAdapter;
import org.apache.ignite.internal.util.lang.GridCloseableIterator;
import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.spi.indexing.IndexingQueryFilter;
import org.apache.ignite.spi.indexing.IndexingQueryCacheFilter;
import org.apache.lucene.analysis.standard.StandardAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.LongField;
import org.apache.lucene.document.StoredField;
import org.apache.lucene.document.StringField;
import org.apache.lucene.document.TextField;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.Term;
import org.apache.lucene.queryparser.classic.MultiFieldQueryParser;
import org.apache.lucene.search.BooleanClause;
import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.NumericRangeQuery;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.util.BytesRef;
import org.h2.util.JdbcUtils;
import org.jetbrains.annotations.Nullable;

import static
org.apache.ignite.internal.processors.query.QueryUtils.KEY_FIELD_NAME;
import static
org.apache.ignite.internal.processors.query.QueryUtils.VAL_FIELD_NAME;

/**
 * Lucene fulltext index.
 */
public class GridLuceneIndex implements AutoCloseable {
    /** Field name for string representation of value. */
    public static final String VAL_STR_FIELD_NAME = "_gg_val_str__";

    /** Field name for value version. */
    public static final String VER_FIELD_NAME = "_gg_ver__";

    /** Field name for value expiration time. */
    public static final String EXPIRATION_TIME_FIELD_NAME = "_gg_expires__";

    /** */
    private final String cacheName;

    /** */
    private final GridQueryTypeDescriptor type;

    /** */
    private final IndexWriter writer;

    /** */
    private final String[] idxdFields;

    /** */
    private final AtomicLong updateCntr = new GridAtomicLong();

    /** */
    private final GridLuceneDirectory dir;

    /** */
    private final GridKernalContext ctx;

    /**
     * Constructor.
     *
     * @param ctx Kernal context.
     * @param cacheName Cache name.
     * @param type Type descriptor.
     * @throws IgniteCheckedException If failed.
     */
    public GridLuceneIndex(GridKernalContext ctx, @Nullable String
cacheName, GridQueryTypeDescriptor type)
        throws IgniteCheckedException {
        this.ctx = ctx;
        this.cacheName = cacheName;
        this.type = type;

        dir = new GridLuceneDirectory(new GridUnsafeMemory(0));

        try {
            writer = new IndexWriter(dir, new IndexWriterConfig(new
StandardAnalyzer()));
        }
        catch (IOException e) {
            throw new IgniteCheckedException(e);
        }

        GridQueryIndexDescriptor idx = type.textIndex();

        if (idx != null) {
            Collection<String> fields = idx.fields();

            idxdFields = new String[fields.size() + 1];

            fields.toArray(idxdFields);
        }
        else {
            assert type.valueTextIndex() || type.valueClass() ==
String.class;

            idxdFields = new String[1];
        }

        idxdFields[idxdFields.length - 1] = VAL_STR_FIELD_NAME;
    }

    /**
     * @return Cache object context.
     */
    private CacheObjectContext objectContext() {
        if (ctx == null)
            return null;

        return
ctx.cache().internalCache(cacheName).context().cacheObjectContext();
    }

    /**
     * Stores given data in this fulltext index.
     *
     * @param k Key.
     * @param v Value.
     * @param ver Version.
     * @param expires Expiration time.
     * @throws IgniteCheckedException If failed.
     */
    @SuppressWarnings("ConstantConditions")
    public void store(CacheObject k, CacheObject v, GridCacheVersion ver,
long expires) throws IgniteCheckedException {
        CacheObjectContext coctx = objectContext();

        Object key = k.isPlatformType() ? k.value(coctx, false) : k;
        Object val = v.isPlatformType() ? v.value(coctx, false) : v;

        Document doc = new Document();

        boolean stringsFound = false;

        if (type.valueTextIndex() || type.valueClass() == String.class) {
            doc.add(new TextField(VAL_STR_FIELD_NAME, val.toString(),
Field.Store.YES));

            stringsFound = true;
        }

        for (int i = 0, last = idxdFields.length - 1; i < last; i++) {
            Object fieldVal = type.value(idxdFields[i], key, val);

            if (fieldVal != null) {
                doc.add(new TextField(idxdFields[i], fieldVal.toString(),
Field.Store.YES));

                stringsFound = true;
            }
        }

        BytesRef keyByteRef = new BytesRef(k.valueBytes(coctx));

        try {
            final Term term = new Term(KEY_FIELD_NAME, keyByteRef);

            if (!stringsFound) {
                writer.deleteDocuments(term);

                return; // We did not find any strings to be indexed, will
not store data at all.
            }

            doc.add(new StringField(KEY_FIELD_NAME, keyByteRef,
Field.Store.YES));

            if (type.valueClass() != String.class)
                doc.add(new StoredField(VAL_FIELD_NAME,
v.valueBytes(coctx)));

            doc.add(new StoredField(VER_FIELD_NAME,
ver.toString().getBytes()));

            doc.add(new LongField(EXPIRATION_TIME_FIELD_NAME, expires,
Field.Store.YES));

            // Next implies remove than add atomically operation.
            writer.updateDocument(term, doc);
        }
        catch (IOException e) {
            throw new IgniteCheckedException(e);
        }
        finally {
            updateCntr.incrementAndGet();
        }
    }

    /**
     * Removes entry for given key from this index.
     *
     * @param key Key.
     * @throws IgniteCheckedException If failed.
     */
    public void remove(CacheObject key) throws IgniteCheckedException {
        try {
            writer.deleteDocuments(new Term(KEY_FIELD_NAME,
                new BytesRef(key.valueBytes(objectContext()))));
        }
        catch (IOException e) {
            throw new IgniteCheckedException(e);
        }
        finally {
            updateCntr.incrementAndGet();
        }
    }

    /**
     * Runs lucene fulltext query over this index.
     *
     * @param qry Query.
     * @param filters Filters over result.
     * @param pageSize Size of batch
     * @return Query result.
     * @throws IgniteCheckedException If failed.
     */
    public <K, V> GridCloseableIterator<IgniteBiTuple<K, V>> query(String
qry, IndexingQueryFilter filters, int pageSize) throws
IgniteCheckedException {
        IndexReader reader;

        try {
            long updates = updateCntr.get();

            if (updates != 0) {
                writer.commit();

                updateCntr.addAndGet(-updates);
            }

            //We can cache reader\searcher and change this to
'openIfChanged'
            reader = DirectoryReader.open(writer, true);
        }
        catch (IOException e) {
            throw new IgniteCheckedException(e);
        }

        IndexSearcher searcher;

        Query query;

        try {
            searcher = new IndexSearcher(reader);

            MultiFieldQueryParser parser = new
MultiFieldQueryParser(idxdFields,
                writer.getAnalyzer());

//            parser.setAllowLeadingWildcard(true);

            // Filter expired items.
            Query filter =
NumericRangeQuery.newLongRange(EXPIRATION_TIME_FIELD_NAME,
U.currentTimeMillis(),
                null, false, false);

            query = new BooleanQuery.Builder()
                .add(parser.parse(qry), BooleanClause.Occur.MUST)
                .add(filter, BooleanClause.Occur.FILTER)
                .build();
        }
        catch (Exception e) {
            U.closeQuiet(reader);

            throw new IgniteCheckedException(e);
        }

        IndexingQueryCacheFilter fltr = null;

        if (filters != null)
            fltr = filters.forCache(cacheName);

        return new It<>(reader, searcher, query, fltr, pageSize);
    }

    /** {@inheritDoc} */
    @Override public void close() {
        U.closeQuiet(writer);
        U.close(dir, ctx.log(GridLuceneIndex.class));
    }

    /**
     * Key-value iterator over fulltext search result.
     */
    private class It<K, V> extends
GridCloseableIteratorAdapter<IgniteBiTuple<K, V>> {
        private final int BatchPosBeforeHead = -1;

        /** */
        private static final long serialVersionUID = 0L;

        /** */
        private final int pageSize;

        /** */
        private final IndexReader reader;

        /** */
        private final Query query;

        /** */
        private final IndexSearcher searcher;

        /** current batch docs*/
        private ScoreDoc[] batch;

        /** current position in batch*/
        private int batchPos = BatchPosBeforeHead;

        /** */
        private final IndexingQueryCacheFilter filters;

        /** */
        private IgniteBiTuple<K, V> curr;

        /** */
        private CacheObjectContext coctx;

        /**
         * Constructor.
         *
         * @param reader Reader.
         * @param searcher Searcher.
         * @param filters Filters over result.
         * @throws IgniteCheckedException if failed.
         */
        private It(IndexReader reader, IndexSearcher searcher, Query query,
IndexingQueryCacheFilter filters, int pageSize)
            throws IgniteCheckedException {
            this.reader = reader;
            this.searcher = searcher;
            this.filters = filters;
            this.query = query;
            this.pageSize = pageSize;

            coctx = objectContext();

            findNext();
        }

        /**
         * @param bytes Bytes.
         * @param ldr Class loader.
         * @return Object.
         * @throws IgniteCheckedException If failed.
         */
        @SuppressWarnings("unchecked")
        private <Z> Z unmarshall(byte[] bytes, ClassLoader ldr) throws
IgniteCheckedException {
            if (coctx == null) // For tests.
                return (Z)JdbcUtils.deserialize(bytes, null);

            return (Z)coctx.kernalContext().cacheObjects().unmarshal(coctx,
bytes, ldr);
        }

        /**
         * Finds next element.
         *
         * @throws IgniteCheckedException If failed.
         */
        @SuppressWarnings("unchecked")
        private void findNext() throws IgniteCheckedException {
            curr = null;

            if(isClosed())
                throw new IgniteCheckedException("Iterator already closed");

            if (shouldRequestNextBatch()) {
                try {
                    requestNextBatch();
                } catch (IOException e) {
                    close();
                    throw new IgniteCheckedException(e);
                }
            }

            if(batch == null)
                return;

            while (batchPos < batch.length) {
                Document doc;
                ScoreDoc scoreDoc =batch[batchPos++];

                try {
                    doc = searcher.doc(scoreDoc.doc);
                }
                catch (IOException e) {
                    throw new IgniteCheckedException(e);
                }

                ClassLoader ldr = null;

                if (ctx != null && ctx.deploy().enabled())
                    ldr =
ctx.cache().internalCache(cacheName).context().deploy().globalLoader();

                K k = unmarshall(doc.getBinaryValue(KEY_FIELD_NAME).bytes,
ldr);

                if (filters != null && !filters.apply(k))
                    continue;

                V v = type.valueClass() == String.class ?
                    (V)doc.get(VAL_STR_FIELD_NAME) :

this.<V>unmarshall(doc.getBinaryValue(VAL_FIELD_NAME).bytes, ldr);

                assert v != null;

                curr = new IgniteBiTuple<>(k, v);

                break;
            }
        }

        private boolean shouldRequestNextBatch()  {
            if(batch == null){
                // should request for first batch
                return (batchPos == BatchPosBeforeHead) ;
            } else {
                // should request when reached to the end of batch
                return (batchPos  == batch.length);
            }
        }

        private void requestNextBatch() throws IOException {
            TopDocs docs;

            if (batch == null) {
                docs = searcher.search(query, pageSize);
            } else {
                docs = searcher.searchAfter(batch[batch.length - 1], query,
pageSize);
            }

            if(docs.scoreDocs.length ==0) {
                batch = null;
            }else{
                batch = docs.scoreDocs;
            }

            batchPos = 0;
        }

        /** {@inheritDoc} */
        @Override protected IgniteBiTuple<K, V> onNext() throws
IgniteCheckedException {
            IgniteBiTuple<K, V> res = curr;

            findNext();

            return res;
        }

        /** {@inheritDoc} */
        @Override protected boolean onHasNext() throws
IgniteCheckedException {
            return curr != null;
        }

        /** {@inheritDoc} */
        @Override protected void onClose() throws IgniteCheckedException {
            U.closeQuiet(reader);
        }
    }
}
```

On Fri, Sep 7, 2018 at 9:02 AM Tâm Nguyễn Mạnh <ng...@gmail.com>
wrote:

> Hi,
>
> I tried to implement iterator for GridLuceneInde, could you please help to
> review ?
>
> --
> Thanks & Best Regards
>
> Tam, Nguyen Manh
>
>

-- 
Thanks & Best Regards

Tam, Nguyen Manh