You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2014/12/10 16:59:15 UTC
[06/17] incubator-ignite git commit: ignite-qry - merged
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/922a2bab/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/opt/GridLuceneIndex.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/opt/GridLuceneIndex.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/opt/GridLuceneIndex.java
new file mode 100644
index 0000000..dd2a29e
--- /dev/null
+++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/opt/GridLuceneIndex.java
@@ -0,0 +1,384 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.query.h2.opt;
+
+import org.apache.commons.codec.binary.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.marshaller.*;
+import org.apache.ignite.spi.indexing.*;
+import org.apache.lucene.analysis.standard.*;
+import org.apache.lucene.document.*;
+import org.apache.lucene.index.*;
+import org.apache.lucene.queryParser.*;
+import org.apache.lucene.search.*;
+import org.apache.lucene.util.*;
+import org.gridgain.grid.*;
+import org.gridgain.grid.kernal.processors.query.*;
+import org.gridgain.grid.util.*;
+import org.gridgain.grid.util.lang.*;
+import org.gridgain.grid.util.offheap.unsafe.*;
+import org.gridgain.grid.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.atomic.*;
+
+import static org.gridgain.grid.kernal.processors.query.h2.GridH2Indexing.*;
+
+/**
+ * Lucene fulltext index.
+ */
+public class GridLuceneIndex implements Closeable {
+ /** Field name for string representation of value. */
+ public static final String VAL_STR_FIELD_NAME = "_gg_val_str__";
+
+ /** Field name for value version. */
+ public static final String VER_FIELD_NAME = "_gg_ver__";
+
+ /** Field name for value expiration time. */
+ public static final String EXPIRATION_TIME_FIELD_NAME = "_gg_expires__";
+
+ /** */
+ private final IgniteMarshaller marshaller;
+
+ /** */
+ private final String spaceName;
+
+ /** */
+ private final GridQueryTypeDescriptor type;
+
+ /** */
+ private final IndexWriter writer;
+
+ /** */
+ private final String[] idxdFields;
+
+ /** */
+ private final boolean storeVal;
+
+ /** */
+ private final BitSet keyFields = new BitSet();
+
+ /** */
+ private final AtomicLong updateCntr = new GridAtomicLong();
+
+ /** */
+ private final GridLuceneDirectory dir;
+
+ /**
+ * Constructor.
+ *
+ * @param marshaller Indexing marshaller.
+ * @param mem Unsafe memory.
+ * @param spaceName Space name.
+ * @param type Type descriptor.
+ * @param storeVal Store value in index.
+ * @throws GridException If failed.
+ */
+ public GridLuceneIndex(IgniteMarshaller marshaller, @Nullable GridUnsafeMemory mem,
+ @Nullable String spaceName, GridQueryTypeDescriptor type, boolean storeVal) throws GridException {
+ this.marshaller = marshaller;
+ this.spaceName = spaceName;
+ this.type = type;
+ this.storeVal = storeVal;
+
+ dir = new GridLuceneDirectory(mem == null ? new GridUnsafeMemory(0) : mem);
+
+ try {
+ writer = new IndexWriter(dir, new IndexWriterConfig(Version.LUCENE_30, new StandardAnalyzer(
+ Version.LUCENE_30)));
+ }
+ catch (IOException e) {
+ throw new GridException(e);
+ }
+
+ GridQueryIndexDescriptor idx = null;
+
+ for (GridQueryIndexDescriptor descriptor : type.indexes().values()) {
+ if (descriptor.type() == GridQueryIndexType.FULLTEXT) {
+ idx = descriptor;
+
+ break;
+ }
+ }
+
+ if (idx != null) {
+ Collection<String> fields = idx.fields();
+
+ idxdFields = new String[fields.size() + 1];
+
+ fields.toArray(idxdFields);
+
+ for (int i = 0, len = fields.size() ; i < len; i++)
+ keyFields.set(i, type.keyFields().containsKey(idxdFields[i]));
+ }
+ else {
+ assert type.valueTextIndex() || type.valueClass() == String.class;
+
+ idxdFields = new String[1];
+ }
+
+ idxdFields[idxdFields.length - 1] = VAL_STR_FIELD_NAME;
+ }
+
+ /**
+ * Stores given data in this fulltext index.
+ *
+ * @param key Key.
+ * @param val Value.
+ * @param ver Version.
+ * @param expires Expiration time.
+ * @throws GridException If failed.
+ */
+ public void store(Object key, Object val, byte[] ver, long expires) throws GridException {
+ Document doc = new Document();
+
+ boolean stringsFound = false;
+
+ if (type.valueTextIndex() || type.valueClass() == String.class) {
+ doc.add(new Field(VAL_STR_FIELD_NAME, val.toString(), Field.Store.YES, Field.Index.ANALYZED));
+
+ stringsFound = true;
+ }
+
+ for (int i = 0, last = idxdFields.length - 1; i < last; i++) {
+ Object fieldVal = type.value(keyFields.get(i) ? key : val, idxdFields[i]);
+
+ if (fieldVal != null) {
+ doc.add(new Field(idxdFields[i], fieldVal.toString(), Field.Store.YES, Field.Index.ANALYZED));
+
+ stringsFound = true;
+ }
+ }
+
+ String keyStr = Base64.encodeBase64String(marshaller.marshal(key));
+
+ try {
+ // Delete first to avoid duplicates.
+ writer.deleteDocuments(new Term(KEY_FIELD_NAME, keyStr));
+
+ if (!stringsFound)
+ return; // We did not find any strings to be indexed, will not store data at all.
+
+ doc.add(new Field(KEY_FIELD_NAME, keyStr, Field.Store.YES, Field.Index.NOT_ANALYZED));
+
+ if (storeVal && type.valueClass() != String.class)
+ doc.add(new Field(VAL_FIELD_NAME, marshaller.marshal(val)));
+
+ doc.add(new Field(VER_FIELD_NAME, ver));
+
+ doc.add(new Field(EXPIRATION_TIME_FIELD_NAME, DateTools.timeToString(expires,
+ DateTools.Resolution.MILLISECOND), Field.Store.YES, Field.Index.NOT_ANALYZED));
+
+ writer.addDocument(doc);
+ }
+ catch (IOException e) {
+ throw new GridException(e);
+ }
+ finally {
+ updateCntr.incrementAndGet();
+ }
+ }
+
+ /**
+ * Removes entry for given key from this index.
+ *
+ * @param key Key.
+ * @throws GridException If failed.
+ */
+ public void remove(Object key) throws GridException {
+ try {
+ writer.deleteDocuments(new Term(KEY_FIELD_NAME, Base64.encodeBase64String(marshaller.marshal(key))));
+ }
+ catch (IOException e) {
+ throw new GridException(e);
+ }
+ finally {
+ updateCntr.incrementAndGet();
+ }
+ }
+
+ /**
+ * Runs lucene fulltext query over this index.
+ *
+ * @param qry Query.
+ * @param filters Filters over result.
+ * @return Query result.
+ * @throws GridException If failed.
+ */
+ public <K, V> GridCloseableIterator<IgniteBiTuple<K, V>> query(String qry,
+ GridIndexingQueryFilter filters) throws GridException {
+ IndexReader reader;
+
+ try {
+ long updates = updateCntr.get();
+
+ if (updates != 0) {
+ writer.commit();
+
+ updateCntr.addAndGet(-updates);
+ }
+
+ reader = IndexReader.open(writer, true);
+ }
+ catch (IOException e) {
+ throw new GridException(e);
+ }
+
+ IndexSearcher searcher = new IndexSearcher(reader);
+
+ MultiFieldQueryParser parser = new MultiFieldQueryParser(Version.LUCENE_30, idxdFields,
+ writer.getAnalyzer());
+
+ // Filter expired items.
+ Filter f = new TermRangeFilter(EXPIRATION_TIME_FIELD_NAME, DateTools.timeToString(U.currentTimeMillis(),
+ DateTools.Resolution.MILLISECOND), null, false, false);
+
+ TopDocs docs;
+
+ try {
+ docs = searcher.search(parser.parse(qry), f, Integer.MAX_VALUE);
+ }
+ catch (Exception e) {
+ throw new GridException(e);
+ }
+
+ IgniteBiPredicate<K, V> fltr = null;
+
+ if (filters != null)
+ fltr = filters.forSpace(spaceName);
+
+ return new It<>(reader, searcher, docs.scoreDocs, fltr);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() {
+ U.closeQuiet(writer);
+ U.closeQuiet(dir);
+ }
+
+ /**
+ * Key-value iterator over fulltext search result.
+ */
+ private class It<K, V> extends GridCloseableIteratorAdapter<IgniteBiTuple<K, V>> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private final IndexReader reader;
+
+ /** */
+ private final IndexSearcher searcher;
+
+ /** */
+ private final ScoreDoc[] docs;
+
+ /** */
+ private final IgniteBiPredicate<K, V> filters;
+
+ /** */
+ private int idx;
+
+ /** */
+ private IgniteBiTuple<K, V> curr;
+
+ /**
+ * Constructor.
+ *
+ * @param reader Reader.
+ * @param searcher Searcher.
+ * @param docs Docs.
+ * @param filters Filters over result.
+ * @throws GridException if failed.
+ */
+ private It(IndexReader reader, IndexSearcher searcher, ScoreDoc[] docs, IgniteBiPredicate<K, V> filters)
+ throws GridException {
+ this.reader = reader;
+ this.searcher = searcher;
+ this.docs = docs;
+ this.filters = filters;
+
+ findNext();
+ }
+
+ /**
+ * Filters key using predicates.
+ *
+ * @param key Key.
+ * @param val Value.
+ * @return {@code True} if key passes filter.
+ */
+ private boolean filter(K key, V val) {
+ return filters == null || filters.apply(key, val) ;
+ }
+
+ /**
+ * Finds next element.
+ *
+ * @throws GridException If failed.
+ */
+ private void findNext() throws GridException {
+ curr = null;
+
+ while (idx < docs.length) {
+ Document doc;
+
+ try {
+ doc = searcher.doc(docs[idx++].doc);
+ }
+ catch (IOException e) {
+ throw new GridException(e);
+ }
+
+ String keyStr = doc.get(KEY_FIELD_NAME);
+
+ ClassLoader ldr = null; // TODO
+
+ K k = marshaller.unmarshal(Base64.decodeBase64(keyStr), ldr);
+
+ byte[] valBytes = doc.getBinaryValue(VAL_FIELD_NAME);
+
+ V v = valBytes != null ? marshaller.<V>unmarshal(valBytes, ldr) :
+ type.valueClass() == String.class ?
+ (V)doc.get(VAL_STR_FIELD_NAME): null;
+
+ if (!filter(k, v))
+ continue;
+
+// byte[] ver = doc.getBinaryValue(VER_FIELD_NAME); TODO rm version
+
+ curr = new IgniteBiTuple<>(k, v);
+
+ break;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteBiTuple<K, V> onNext() throws GridException {
+ IgniteBiTuple<K, V> res = curr;
+
+ findNext();
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected boolean onHasNext() throws GridException {
+ return curr != null;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void onClose() throws GridException {
+ U.closeQuiet(searcher);
+ U.closeQuiet(reader);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/922a2bab/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/opt/GridLuceneInputStream.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/opt/GridLuceneInputStream.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/opt/GridLuceneInputStream.java
new file mode 100644
index 0000000..2f3fe89
--- /dev/null
+++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/opt/GridLuceneInputStream.java
@@ -0,0 +1,220 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.query.h2.opt;
+
+import org.apache.lucene.store.*;
+import org.gridgain.grid.util.offheap.unsafe.*;
+
+import java.io.*;
+
+import static org.gridgain.grid.kernal.processors.query.h2.opt.GridLuceneOutputStream.*;
+
+/**
+ * A memory-resident {@link IndexInput} implementation.
+ */
+public class GridLuceneInputStream extends IndexInput {
+ /** */
+ private GridLuceneFile file;
+
+ /** */
+ private long length;
+
+ /** */
+ private long currBuf;
+
+ /** */
+ private int currBufIdx;
+
+ /** */
+ private int bufPosition;
+
+ /** */
+ private long bufStart;
+
+ /** */
+ private int bufLength;
+
+ /** */
+ private final GridUnsafeMemory mem;
+
+ /**
+ * Constructor.
+ *
+ * @param name Name.
+ * @param f File.
+ * @throws IOException If failed.
+ */
+ public GridLuceneInputStream(String name, GridLuceneFile f) throws IOException {
+ super("RAMInputStream(name=" + name + ")");
+
+ file = f;
+
+ length = file.getLength();
+
+ if (length / BUFFER_SIZE >= Integer.MAX_VALUE)
+ throw new IOException("RAMInputStream too large length=" + length + ": " + name);
+
+ mem = file.getDirectory().memory();
+
+ // make sure that we switch to the
+ // first needed buffer lazily
+ currBufIdx = -1;
+ currBuf = 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() {
+ // nothing to do here
+ }
+
+ /** {@inheritDoc} */
+ @Override public long length() {
+ return length;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte readByte() throws IOException {
+ if (bufPosition >= bufLength) {
+ currBufIdx++;
+
+ switchCurrentBuffer(true);
+ }
+
+ return mem.readByte(currBuf + bufPosition++);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readBytes(byte[] b, int offset, int len) throws IOException {
+ while (len > 0) {
+ if (bufPosition >= bufLength) {
+ currBufIdx++;
+
+ switchCurrentBuffer(true);
+ }
+
+ int remainInBuf = bufLength - bufPosition;
+ int bytesToCp = len < remainInBuf ? len : remainInBuf;
+
+ mem.readBytes(currBuf + bufPosition, b, offset, bytesToCp);
+
+ offset += bytesToCp;
+ len -= bytesToCp;
+
+ bufPosition += bytesToCp;
+ }
+ }
+
+ /**
+ * Switch buffer to next.
+ *
+ * @param enforceEOF if we need to enforce {@link EOFException}.
+ * @throws IOException if failed.
+ */
+ private void switchCurrentBuffer(boolean enforceEOF) throws IOException {
+ bufStart = (long)BUFFER_SIZE * (long)currBufIdx;
+
+ if (currBufIdx >= file.numBuffers()) {
+ // end of file reached, no more buffers left
+ if (enforceEOF)
+ throw new EOFException("read past EOF: " + this);
+
+ // Force EOF if a read takes place at this position
+ currBufIdx--;
+ bufPosition = BUFFER_SIZE;
+ }
+ else {
+ currBuf = file.getBuffer(currBufIdx);
+ bufPosition = 0;
+
+ long buflen = length - bufStart;
+
+ bufLength = buflen > BUFFER_SIZE ? BUFFER_SIZE : (int)buflen;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void copyBytes(IndexOutput out, long numBytes) throws IOException {
+ assert numBytes >= 0 : "numBytes=" + numBytes;
+
+ GridLuceneOutputStream gridOut = out instanceof GridLuceneOutputStream ? (GridLuceneOutputStream)out : null;
+
+ long left = numBytes;
+
+ while (left > 0) {
+ if (bufPosition == bufLength) {
+ ++currBufIdx;
+
+ switchCurrentBuffer(true);
+ }
+
+ final int bytesInBuf = bufLength - bufPosition;
+ final int toCp = (int)(bytesInBuf < left ? bytesInBuf : left);
+
+ if (gridOut != null)
+ gridOut.writeBytes(currBuf + bufPosition, toCp);
+ else {
+ byte[] buff = new byte[toCp];
+
+ mem.readBytes(currBuf + bufPosition, buff);
+
+ out.writeBytes(buff, toCp);
+ }
+
+ bufPosition += toCp;
+
+ left -= toCp;
+ }
+
+ assert left == 0 : "Insufficient bytes to copy: numBytes=" + numBytes + " copied=" + (numBytes - left);
+ }
+
+ /**
+ * For direct calls from {@link GridLuceneOutputStream}.
+ *
+ * @param ptr Pointer.
+ * @param len Length.
+ * @throws IOException If failed.
+ */
+ void readBytes(long ptr, int len) throws IOException {
+ while (len > 0) {
+ if (bufPosition >= bufLength) {
+ currBufIdx++;
+
+ switchCurrentBuffer(true);
+ }
+
+ int remainInBuf = bufLength - bufPosition;
+ int bytesToCp = len < remainInBuf ? len : remainInBuf;
+
+ mem.copyMemory(currBuf + bufPosition, ptr, bytesToCp);
+
+ ptr += bytesToCp;
+ len -= bytesToCp;
+
+ bufPosition += bytesToCp;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getFilePointer() {
+ return currBufIdx < 0 ? 0 : bufStart + bufPosition;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void seek(long pos) throws IOException {
+ if (currBuf == 0 || pos < bufStart || pos >= bufStart + BUFFER_SIZE) {
+ currBufIdx = (int)(pos / BUFFER_SIZE);
+
+ switchCurrentBuffer(false);
+ }
+
+ bufPosition = (int)(pos % BUFFER_SIZE);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/922a2bab/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/opt/GridLuceneLockFactory.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/opt/GridLuceneLockFactory.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/opt/GridLuceneLockFactory.java
new file mode 100644
index 0000000..34d92ee
--- /dev/null
+++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/opt/GridLuceneLockFactory.java
@@ -0,0 +1,64 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.query.h2.opt;
+
+import org.apache.lucene.store.*;
+import org.gridgain.grid.util.*;
+
+import java.io.*;
+
+/**
+ * Lucene {@link LockFactory} implementation.
+ */
+public class GridLuceneLockFactory extends LockFactory {
+ /** */
+ @SuppressWarnings("TypeMayBeWeakened")
+ private final GridConcurrentHashSet<String> locks = new GridConcurrentHashSet<>();
+
+ /** {@inheritDoc} */
+ @Override public Lock makeLock(String lockName) {
+ return new LockImpl(lockName);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void clearLock(String lockName) throws IOException {
+ locks.remove(lockName);
+ }
+
+ /**
+ * {@link Lock} Implementation.
+ */
+ private class LockImpl extends Lock {
+ /** */
+ private final String lockName;
+
+ /**
+ * @param lockName Lock name.
+ */
+ private LockImpl(String lockName) {
+ this.lockName = lockName;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean obtain() throws IOException {
+ return locks.add(lockName);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void release() throws IOException {
+ locks.remove(lockName);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isLocked() throws IOException {
+ return locks.contains(lockName);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/922a2bab/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/opt/GridLuceneOutputStream.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/opt/GridLuceneOutputStream.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/opt/GridLuceneOutputStream.java
new file mode 100644
index 0000000..ff5abf2
--- /dev/null
+++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/opt/GridLuceneOutputStream.java
@@ -0,0 +1,230 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.query.h2.opt;
+
+import org.apache.lucene.store.*;
+import org.gridgain.grid.util.offheap.unsafe.*;
+
+import java.io.IOException;
+
+/**
+ * A memory-resident {@link IndexOutput} implementation.
+ */
+public class GridLuceneOutputStream extends IndexOutput {
+ /** Off-heap page size. */
+ static final int BUFFER_SIZE = 32 * 1024;
+
+ /** */
+ private GridLuceneFile file;
+
+ /** */
+ private long currBuf;
+
+ /** */
+ private int currBufIdx;
+
+ /** */
+ private int bufPosition;
+
+ /** */
+ private long bufStart;
+
+ /** */
+ private int bufLength;
+
+ /** */
+ private final GridUnsafeMemory mem;
+
+ /**
+ * Constructor.
+ *
+ * @param f File.
+ */
+ public GridLuceneOutputStream(GridLuceneFile f) {
+ file = f;
+
+ mem = f.getDirectory().memory();
+
+ // make sure that we switch to the
+ // first needed buffer lazily
+ currBufIdx = -1;
+ currBuf = 0;
+ }
+
+ /**
+ * Resets this to an empty file.
+ */
+ public void reset() {
+ currBuf = 0;
+ currBufIdx = -1;
+ bufPosition = 0;
+ bufStart = 0;
+ bufLength = 0;
+
+ file.setLength(0);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() throws IOException {
+ flush();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void seek(long pos) throws IOException {
+ // set the file length in case we seek back
+ // and flush() has not been called yet
+ setFileLength();
+
+ if (pos < bufStart || pos >= bufStart + bufLength) {
+ currBufIdx = (int)(pos / BUFFER_SIZE);
+
+ switchCurrentBuffer();
+ }
+
+ bufPosition = (int)(pos % BUFFER_SIZE);
+ }
+
+ /** {@inheritDoc} */
+ @Override public long length() {
+ return file.getLength();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeByte(byte b) throws IOException {
+ if (bufPosition == bufLength) {
+ currBufIdx++;
+
+ switchCurrentBuffer();
+ }
+
+ mem.writeByte(currBuf + bufPosition++, b);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeBytes(byte[] b, int offset, int len) throws IOException {
+ assert b != null;
+
+ while (len > 0) {
+ if (bufPosition == bufLength) {
+ currBufIdx++;
+
+ switchCurrentBuffer();
+ }
+
+ int remainInBuf = BUFFER_SIZE - bufPosition;
+ int bytesToCp = len < remainInBuf ? len : remainInBuf;
+
+ mem.writeBytes(currBuf + bufPosition, b, offset, bytesToCp);
+
+ offset += bytesToCp;
+ len -= bytesToCp;
+
+ bufPosition += bytesToCp;
+ }
+ }
+
+ /**
+ * Switch buffer to next.
+ */
+ private void switchCurrentBuffer() {
+ currBuf = currBufIdx == file.numBuffers() ? file.addBuffer() : file.getBuffer(currBufIdx);
+
+ bufPosition = 0;
+ bufStart = (long)BUFFER_SIZE * (long)currBufIdx;
+ bufLength = BUFFER_SIZE;
+ }
+
+ /**
+ * Sets file length.
+ */
+ private void setFileLength() {
+ long pointer = bufStart + bufPosition;
+
+ if (pointer > file.getLength())
+ file.setLength(pointer);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void flush() throws IOException {
+ setFileLength();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getFilePointer() {
+ return currBufIdx < 0 ? 0 : bufStart + bufPosition;
+ }
+
+ /**
+ * Returns byte usage of all buffers.
+ *
+ * @return Bytes used.
+ */
+ public long sizeInBytes() {
+ return (long)file.numBuffers() * (long)BUFFER_SIZE;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void copyBytes(DataInput input, long numBytes) throws IOException {
+ assert numBytes >= 0 : "numBytes=" + numBytes;
+
+ GridLuceneInputStream gridInput = input instanceof GridLuceneInputStream ? (GridLuceneInputStream)input : null;
+
+ while (numBytes > 0) {
+ if (bufPosition == bufLength) {
+ currBufIdx++;
+
+ switchCurrentBuffer();
+ }
+
+ int toCp = BUFFER_SIZE - bufPosition;
+
+ if (numBytes < toCp)
+ toCp = (int)numBytes;
+
+ if (gridInput != null)
+ gridInput.readBytes(currBuf + bufPosition, toCp);
+ else {
+ byte[] buff = new byte[toCp];
+
+ input.readBytes(buff, 0, toCp, false);
+
+ mem.writeBytes(currBuf + bufPosition, buff);
+ }
+
+ numBytes -= toCp;
+ bufPosition += toCp;
+ }
+ }
+
+ /**
+ * For direct usage by {@link GridLuceneInputStream}.
+ *
+ * @param ptr Pointer.
+ * @param len Length.
+ * @throws IOException If failed.
+ */
+ void writeBytes(long ptr, int len) throws IOException {
+ while (len > 0) {
+ if (bufPosition == bufLength) {
+ currBufIdx++;
+ switchCurrentBuffer();
+ }
+
+ int remainInBuf = BUFFER_SIZE - bufPosition;
+ int bytesToCp = len < remainInBuf ? len : remainInBuf;
+
+ mem.copyMemory(ptr, currBuf + bufPosition, bytesToCp);
+
+ ptr += bytesToCp;
+ len -= bytesToCp;
+ bufPosition += bytesToCp;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/922a2bab/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/opt/GridSearchRowPointer.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/opt/GridSearchRowPointer.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/opt/GridSearchRowPointer.java
new file mode 100644
index 0000000..4f11bfc
--- /dev/null
+++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/opt/GridSearchRowPointer.java
@@ -0,0 +1,20 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.query.h2.opt;
+
+import org.gridgain.grid.util.offheap.unsafe.*;
+import org.h2.result.*;
+
+/**
+ * Search row which supports pointer operations.
+ */
+public interface GridSearchRowPointer extends SearchRow, GridOffHeapSmartPointer {
+ // No-op.
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/922a2bab/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/package.html
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/package.html b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/package.html
new file mode 100644
index 0000000..1a7d215
--- /dev/null
+++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/package.html
@@ -0,0 +1,15 @@
+<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
+<!--
+ @html.file.header
+ _________ _____ __________________ _____
+ __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+-->
+<html>
+<body>
+ <!-- Package description. -->
+ Contains <b>default</b> H2-based indexing SPI implementation.
+</body>
+</html>
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/922a2bab/modules/indexing/src/main/java/org/gridgain/grid/spi/indexing/h2/GridH2IndexingSpaceConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/spi/indexing/h2/GridH2IndexingSpaceConfiguration.java b/modules/indexing/src/main/java/org/gridgain/grid/spi/indexing/h2/GridH2IndexingSpaceConfiguration.java
deleted file mode 100644
index 1ba7a5d..0000000
--- a/modules/indexing/src/main/java/org/gridgain/grid/spi/indexing/h2/GridH2IndexingSpaceConfiguration.java
+++ /dev/null
@@ -1,156 +0,0 @@
-/* @java.file.header */
-
-/* _________ _____ __________________ _____
- * __ ____/___________(_)______ /__ ____/______ ____(_)_______
- * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
- * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
- * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
- */
-
-package org.gridgain.grid.spi.indexing.h2;
-
-import org.gridgain.grid.util.typedef.internal.*;
-
-/**
- * Defines per-space configuration properties for {@link GridH2IndexingSpi}.
- */
-public class GridH2IndexingSpaceConfiguration {
- /** */
- private String name;
-
- /** */
- private boolean idxPrimitiveKey;
-
- /** */
- private boolean idxPrimitiveVal;
-
- /** */
- private boolean idxFixedTyping;
-
- /** */
- private boolean escapeAll;
-
- /**
- * Gets space name to which this configuration applies.
- *
- * @return Space name.
- */
- public String getName() {
- return name;
- }
-
- /**
- * Sets space name.
- *
- * @param name Space name.
- */
- public void setName(String name) {
- this.name = name;
- }
-
- /**
- * Gets flag indicating whether indexing SPI should index by key in cases
- * where key is primitive type
- *
- * @return {@code True} if primitive keys should be indexed.
- */
- public boolean isIndexPrimitiveKey() {
- return idxPrimitiveKey;
- }
-
- /**
- * Sets flag indicating whether indexing SPI should index by key in cases
- * where key is primitive type.
- *
- * @param idxPrimitiveKey {@code True} if primitive keys should be indexed.
- */
- public void setIndexPrimitiveKey(boolean idxPrimitiveKey) {
- this.idxPrimitiveKey = idxPrimitiveKey;
- }
-
- /**
- * Gets flag indicating whether indexing SPI should index by value in cases
- * where value is primitive type
- *
- * @return {@code True} if primitive values should be indexed.
- */
- public boolean isIndexPrimitiveValue() {
- return idxPrimitiveVal;
- }
-
- /**
- * Sets flag indexing whether indexing SPI should index by value in cases
- * where value is primitive type.
- *
- * @param idxPrimitiveVal {@code True} if primitive values should be indexed.
- */
- public void setIndexPrimitiveValue(boolean idxPrimitiveVal) {
- this.idxPrimitiveVal = idxPrimitiveVal;
- }
-
- /**
- * This flag essentially controls whether all values of the same type have
- * identical key type.
- * <p>
- * If {@code false}, SPI will store all keys in BINARY form to make it possible to store
- * the same value type with different key types. If {@code true}, key type will be converted
- * to respective SQL type if it is possible, hence, improving performance of queries.
- * <p>
- * Setting this value to {@code false} also means that {@code '_key'} column cannot be indexed and
- * cannot participate in query where clauses. The behavior of using '_key' column in where
- * clauses with this flag set to {@code false} is undefined.
- *
- * @return {@code True} if SPI should try to convert values to their respective SQL
- * types for better performance.
- */
- public boolean isIndexFixedTyping() {
- return idxFixedTyping;
- }
-
- /**
- * This flag essentially controls whether key type is going to be identical
- * for all values of the same type.
- * <p>
- * If false, SPI will store all keys in BINARY form to make it possible to store
- * the same value type with different key types. If true, key type will be converted
- * to respective SQL type if it is possible, which may provide significant performance
- * boost.
- *
- * @param idxFixedTyping {@code True} if SPI should try to convert values to their respective SQL
- * types for better performance.
- */
- public void setIndexFixedTyping(boolean idxFixedTyping) {
- this.idxFixedTyping = idxFixedTyping;
- }
-
- /**
- * If {@code true}, then table name and all column names in 'create table' SQL
- * generated by SPI are escaped with double quotes. This flag should be set if table name of
- * column name is H2 reserved word or is not valid H2 identifier (e.g. contains space or hyphen).
- * <p>
- * Note if this flag is set then table and column name in SQL queries also must be escaped with double quotes.
-
- * @return Flag value.
- */
- public boolean isEscapeAll() {
- return escapeAll;
- }
-
- /**
- * If {@code true}, then table name and all column names in 'create table' SQL
- * generated by SPI are escaped with double quotes. This flag should be set if table name of
- * column name is H2 reserved word or is not valid H2 identifier (e.g. contains space or hyphen).
- * <p>
- * Note if this flag is set then table and column name in SQL queries also must be escaped with double quotes.
-
- * @param escapeAll Flag value.
- */
- public void setEscapeAll(boolean escapeAll) {
- this.escapeAll = escapeAll;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(GridH2IndexingSpaceConfiguration.class, this);
- }
-}