You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2014/12/09 17:33:17 UTC
[07/13] incubator-ignite git commit: ignite-qry - merged
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/922a2bab/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/opt/GridH2KeyValueRowOffheap.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/opt/GridH2KeyValueRowOffheap.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/opt/GridH2KeyValueRowOffheap.java
new file mode 100644
index 0000000..90fa24d
--- /dev/null
+++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/opt/GridH2KeyValueRowOffheap.java
@@ -0,0 +1,346 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.query.h2.opt;
+
+import org.apache.ignite.spi.*;
+import org.gridgain.grid.*;
+import org.gridgain.grid.util.*;
+import org.gridgain.grid.util.offheap.unsafe.*;
+import org.h2.store.*;
+import org.h2.value.*;
+import org.jetbrains.annotations.*;
+
+import java.util.concurrent.locks.*;
+
+/**
+ * Offheap row.
+ */
+public class GridH2KeyValueRowOffheap extends GridH2AbstractKeyValueRow {
+ /** */
+ private static final GridStripedLock lock;
+
+ /**
+ * Init locks.
+ */
+ static {
+ int cpus = Runtime.getRuntime().availableProcessors();
+
+ lock = new GridStripedLock(cpus * cpus * 8);
+ }
+
+ /** */
+ private static final int OFFSET_KEY_SIZE = 4; // 4 after ref cnt int
+
+ /** */
+ private static final int OFFSET_VALUE_REF = OFFSET_KEY_SIZE + 4; // 8
+
+ /** */
+ private static final int OFFSET_EXPIRATION = OFFSET_VALUE_REF + 8; // 16
+
+ /** */
+ private static final int OFFSET_KEY = OFFSET_EXPIRATION + 8; // 24
+
+ /** */
+ private static final int OFFSET_VALUE = 4; // 4 on separate page after val size int
+
+ /** */
+ private static final Data SIZE_CALCULATOR = Data.create(null, null);
+
+ /** */
+ @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
+ private long ptr;
+
+ /**
+ * @param desc Row descriptor.
+ * @param ptr Pointer.
+ */
+ public GridH2KeyValueRowOffheap(GridH2RowDescriptor desc, long ptr) {
+ super(desc);
+
+ assert ptr > 0 : ptr;
+
+ this.ptr = ptr;
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param desc Row descriptor.
+ * @param key Key.
+ * @param keyType Key type.
+ * @param val Value.
+ * @param valType Value type.
+ * @param expirationTime Expiration time.
+ * @throws IgniteSpiException If failed.
+ */
+ public GridH2KeyValueRowOffheap(GridH2RowDescriptor desc, Object key, int keyType, @Nullable Object val, int valType,
+ long expirationTime) throws IgniteSpiException {
+ super(desc, key, keyType, val, valType, expirationTime);
+ }
+
+ /** {@inheritDoc} */
+ @Override public long expirationTime() {
+ if (expirationTime == 0) {
+ long p = ptr;
+
+ assert p > 0 : p;
+
+ // We don't need any synchronization or volatility here because we publish via
+ // volatile write to tree node.
+ expirationTime = desc.memory().readLong(p + OFFSET_EXPIRATION);
+ }
+
+ return expirationTime;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void cache() {
+ desc.cache(this);
+ }
+
+ /**
+ * @param ptr Pointer to get lock for.
+ * @return Locked lock, must be released in {@code finally} block.
+ */
+ @SuppressWarnings("LockAcquiredButNotSafelyReleased")
+ private static Lock lock(long ptr) {
+ assert (ptr & 7) == 0 : ptr; // Unsafe allocated pointers aligned.
+
+ Lock l = lock.getLock(ptr >>> 3);
+
+ l.lock();
+
+ return l;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("LockAcquiredButNotSafelyReleased")
+ @Override protected Value getOffheapValue(int col) {
+ GridUnsafeMemory mem = desc.memory();
+
+ long p = ptr;
+
+ assert p > 0 : p;
+
+ byte[] bytes = null;
+
+ if (col == KEY_COL) {
+ int size = mem.readInt(p + OFFSET_KEY_SIZE);
+
+ assert size > 0 : size;
+
+ bytes = mem.readBytes(p + OFFSET_KEY, size);
+ }
+ else if (col == VAL_COL) {
+ Lock l = lock(p);
+
+ desc.guard().begin();
+
+ try {
+ long valPtr = mem.readLongVolatile(p + OFFSET_VALUE_REF);
+
+ if (valPtr == 0) // Value was evicted.
+ return null;
+
+ int size = mem.readInt(valPtr);
+
+ assert size > 0 : size;
+
+ bytes = mem.readBytes(valPtr + OFFSET_VALUE, size);
+ }
+ finally {
+ desc.guard().end();
+
+ l.unlock();
+ }
+ }
+ else
+ assert false : col;
+
+ Data data = Data.create(null, bytes);
+
+ return data.readValue();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long pointer() {
+ long p = ptr;
+
+ assert p > 0: p;
+
+ return p;
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized void onSwap() throws GridException {
+ Lock l = lock(ptr);
+
+ try {
+ final long p = ptr + OFFSET_VALUE_REF;
+
+ final GridUnsafeMemory mem = desc.memory();
+
+ final long valPtr = mem.readLongVolatile(p);
+
+ assert valPtr > 0: valPtr;
+
+ desc.guard().finalizeLater(new Runnable() {
+ @Override public void run() {
+ mem.casLong(p, valPtr, 0); // If it was unswapped concurrently we will not update.
+
+ mem.release(valPtr, mem.readInt(valPtr) + OFFSET_VALUE);
+ }
+ });
+ }
+ finally {
+ l.unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("NonSynchronizedMethodOverridesSynchronizedMethod")
+ @Override protected Value updateWeakValue(Value exp, Value upd) {
+ return exp;
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized void onUnswap(Object val) throws GridException {
+ super.onUnswap(val);
+
+ Value v = getValue(VAL_COL);
+
+ byte[] bytes = new byte[SIZE_CALCULATOR.getValueLen(v)];
+
+ Data data = Data.create(null, bytes);
+
+ data.writeValue(v);
+
+ long p = ptr;
+
+ assert p > 0 : p;
+
+ Lock l = lock(p);
+
+ try {
+ GridUnsafeMemory mem = desc.memory();
+
+ long valPtr = mem.allocate(bytes.length + OFFSET_VALUE);
+
+ mem.writeInt(valPtr, bytes.length);
+ mem.writeBytes(valPtr + OFFSET_VALUE, bytes);
+
+ mem.writeLongVolatile(p + OFFSET_VALUE_REF, valPtr);
+ }
+ finally {
+ l.unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected synchronized Value syncValue() {
+ Value v = super.syncValue();
+
+ if (v != null)
+ return v;
+
+ return getOffheapValue(VAL_COL);
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings({"NonPrivateFieldAccessedInSynchronizedContext"})
+ @Override public void incrementRefCount() {
+ long p = ptr;
+
+ GridUnsafeMemory mem = desc.memory();
+
+ if (p == 0) { // Serialize data to offheap memory.
+ Value key = getValue(KEY_COL);
+ Value val = getValue(VAL_COL);
+
+ assert key != null;
+ assert val != null;
+
+ Data data = Data.create(null, new byte[SIZE_CALCULATOR.getValueLen(key)]);
+
+ data.writeValue(key);
+
+ int keySize = data.length();
+
+ p = mem.allocate(keySize + OFFSET_KEY);
+
+ // We don't need any synchronization or volatility here because we publish via
+ // volatile write to tree node.
+ mem.writeInt(p, 1);
+ mem.writeLong(p + OFFSET_EXPIRATION, expirationTime);
+ mem.writeInt(p + OFFSET_KEY_SIZE, keySize);
+ mem.writeBytes(p + OFFSET_KEY, data.getBytes(), 0, keySize);
+
+ data = Data.create(null, new byte[SIZE_CALCULATOR.getValueLen(val)]);
+
+ data.writeValue(val);
+
+ int valSize = data.length();
+
+ long valPtr = mem.allocate(valSize + OFFSET_VALUE);
+
+ mem.writeInt(valPtr, valSize);
+ mem.writeBytes(valPtr + OFFSET_VALUE, data.getBytes(), 0, valSize);
+
+ mem.writeLongVolatile(p + OFFSET_VALUE_REF, valPtr);
+
+ ptr = p;
+
+ desc.cache(this);
+ }
+ else {
+ for (;;) {
+ int cnt = mem.readIntVolatile(p);
+
+ assert cnt > 0 : cnt;
+
+ if (mem.casInt(p, cnt, cnt + 1))
+ break;
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void decrementRefCount() {
+ long p = ptr;
+
+ assert p > 0 : p;
+
+ GridUnsafeMemory mem = desc.memory();
+
+ for (;;) {
+ int cnt = mem.readIntVolatile(p);
+
+ assert cnt > 0 : cnt;
+
+ if (cnt == 1)
+ break;
+
+ if (mem.casInt(p, cnt, cnt - 1))
+ return;
+ }
+
+ desc.uncache(p);
+
+ // Deallocate off-heap memory.
+ long valPtr = mem.readLongVolatile(p + OFFSET_VALUE_REF);
+
+ assert valPtr >= 0 : valPtr;
+
+ if (valPtr != 0)
+ mem.release(valPtr, mem.readInt(valPtr) + OFFSET_VALUE);
+
+ mem.release(p, mem.readInt(p + OFFSET_KEY_SIZE) + OFFSET_KEY);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/922a2bab/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/opt/GridH2KeyValueRowOnheap.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/opt/GridH2KeyValueRowOnheap.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/opt/GridH2KeyValueRowOnheap.java
new file mode 100644
index 0000000..22cd94e
--- /dev/null
+++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/opt/GridH2KeyValueRowOnheap.java
@@ -0,0 +1,46 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.query.h2.opt;
+
+
+import org.apache.ignite.spi.*;
+import org.h2.value.*;
+import org.jetbrains.annotations.*;
+
+/**
+ * Onheap row.
+ */
+public class GridH2KeyValueRowOnheap extends GridH2AbstractKeyValueRow {
+ /**
+ * Constructor.
+ *
+ * @param desc Row descriptor.
+ * @param key Key.
+ * @param keyType Key type.
+ * @param val Value.
+ * @param valType Value type.
+ * @param expirationTime Expiration time.
+ * @throws IgniteSpiException If failed.
+ */
+ public GridH2KeyValueRowOnheap(GridH2RowDescriptor desc, Object key, int keyType, @Nullable Object val, int valType,
+ long expirationTime) throws IgniteSpiException {
+ super(desc, key, keyType, val, valType, expirationTime);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void cache() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override protected Value getOffheapValue(int col) {
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/922a2bab/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/opt/GridH2Row.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/opt/GridH2Row.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/opt/GridH2Row.java
new file mode 100644
index 0000000..3a88938
--- /dev/null
+++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/opt/GridH2Row.java
@@ -0,0 +1,40 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.query.h2.opt;
+
+import org.h2.result.*;
+import org.h2.value.*;
+
+/**
+ * Row with locking support needed for unique key conflicts resolution.
+ */
+public class GridH2Row extends Row implements GridSearchRowPointer {
+ /**
+ * @param data Column values.
+ */
+ public GridH2Row(Value... data) {
+ super(data, MEMORY_CALCULATE);
+ }
+
+ /** {@inheritDoc} */
+ @Override public long pointer() {
+ throw new IllegalStateException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void incrementRefCount() {
+ throw new IllegalStateException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void decrementRefCount() {
+ throw new IllegalStateException();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/922a2bab/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/opt/GridH2RowDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/opt/GridH2RowDescriptor.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/opt/GridH2RowDescriptor.java
new file mode 100644
index 0000000..685a128
--- /dev/null
+++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/opt/GridH2RowDescriptor.java
@@ -0,0 +1,102 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.query.h2.opt;
+
+import org.gridgain.grid.*;
+import org.gridgain.grid.kernal.processors.query.h2.*;
+import org.gridgain.grid.util.offheap.unsafe.*;
+import org.jetbrains.annotations.*;
+
+/**
+ * Row descriptor.
+ */
+public interface GridH2RowDescriptor extends GridOffHeapSmartPointerFactory<GridH2KeyValueRowOffheap> {
+ /**
+ * @return Owner.
+ */
+ public GridH2Indexing owner();
+
+ /**
+ * Creates new row.
+ *
+ * @param key Key.
+ * @param val Value.
+ * @param expirationTime Expiration time in millis.
+ * @return Row.
+ * @throws GridException If failed.
+ */
+ public GridH2AbstractKeyValueRow createRow(Object key, @Nullable Object val, long expirationTime)
+ throws GridException;
+
+ /**
+ * @param key Cache key.
+ * @return Value.
+ * @throws GridException If failed.
+ */
+ public Object readFromSwap(Object key) throws GridException;
+
+ /**
+ * @return Value type.
+ */
+ public int valueType();
+
+ /**
+ * @return {@code true} If we need to store {@code toString()} of value.
+ */
+ public boolean valueToString();
+
+ /**
+ * @return Total fields count.
+ */
+ public int fieldsCount();
+
+ /**
+ * Gets value type for column index.
+ *
+ * @param col Column index.
+ * @return Value type.
+ */
+ public int fieldType(int col);
+
+ /**
+ * Gets column value by column index.
+ *
+ * @param obj Object to extract value from.
+ * @param col Column index.
+ * @return Column value.
+ */
+ public Object columnValue(Object obj, int col);
+
+ /**
+ * @param col Column index.
+ * @return {@code True} if column relates to key, false if it relates to value.
+ */
+ public boolean isKeyColumn(int col);
+
+ /**
+ * @return Unsafe memory.
+ */
+ public GridUnsafeMemory memory();
+
+ /**
+ * @param row Deserialized offheap row to cache in heap.
+ */
+ public void cache(GridH2KeyValueRowOffheap row);
+
+ /**
+ * @param ptr Offheap pointer to remove from cache.
+ */
+ public void uncache(long ptr);
+
+ /**
+ * @return Guard.
+ */
+ public GridUnsafeGuard guard();
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/922a2bab/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/opt/GridH2SpatialIndex.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/opt/GridH2SpatialIndex.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/opt/GridH2SpatialIndex.java
new file mode 100644
index 0000000..20aab15
--- /dev/null
+++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/opt/GridH2SpatialIndex.java
@@ -0,0 +1,318 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.query.h2.opt;
+
+import com.vividsolutions.jts.geom.*;
+import org.h2.engine.*;
+import org.h2.index.*;
+import org.h2.index.Cursor;
+import org.h2.message.*;
+import org.h2.mvstore.*;
+import org.h2.mvstore.rtree.*;
+import org.h2.result.*;
+import org.h2.table.*;
+import org.h2.value.*;
+
+import java.util.*;
+import java.util.concurrent.locks.*;
+
+/**
+ * Spatial index.
+ */
+public class GridH2SpatialIndex extends GridH2IndexBase implements SpatialIndex {
+ /** */
+ private final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+ /** */
+ private volatile long rowCnt;
+
+ /** */
+ private long rowIds;
+
+ /** */
+ private boolean closed;
+
+ /** */
+ private final MVRTreeMap<Long> treeMap;
+
+ /** */
+ private final Map<Long, GridH2Row> idToRow = new HashMap<>();
+
+ /** */
+ private final Map<Value, Long> keyToId = new HashMap<>();
+
+ /** */
+ private final MVStore store;
+
+ /**
+ * @param tbl Table.
+ * @param idxName Index name.
+ * @param cols Columns.
+ * @param keyCol Key column.
+ * @param valCol Value column.
+ */
+ public GridH2SpatialIndex(Table tbl, String idxName, IndexColumn[] cols, int keyCol, int valCol) {
+ super(keyCol, valCol);
+
+ if (cols.length > 1)
+ throw DbException.getUnsupportedException("can only do one column");
+
+ if ((cols[0].sortType & SortOrder.DESCENDING) != 0)
+ throw DbException.getUnsupportedException("cannot do descending");
+
+ if ((cols[0].sortType & SortOrder.NULLS_FIRST) != 0)
+ throw DbException.getUnsupportedException("cannot do nulls first");
+
+ if ((cols[0].sortType & SortOrder.NULLS_LAST) != 0)
+ throw DbException.getUnsupportedException("cannot do nulls last");
+
+ initBaseIndex(tbl, 0, idxName, cols, IndexType.createNonUnique(false, false, true));
+
+ table = tbl;
+
+ if (cols[0].column.getType() != Value.GEOMETRY) {
+ throw DbException.getUnsupportedException("spatial index on non-geometry column, " +
+ cols[0].column.getCreateSQL());
+ }
+
+ // Index in memory
+ store = MVStore.open(null);
+ treeMap = store.openMap("spatialIndex", new MVRTreeMap.Builder<Long>());
+ }
+
+ /**
+ * Check closed.
+ */
+ private void checkClosed() {
+ if (closed)
+ throw DbException.throwInternalError();
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridH2Row put(GridH2Row row) {
+ Lock l = lock.writeLock();
+
+ l.lock();
+
+ try {
+ checkClosed();
+
+ Value key = row.getValue(keyCol);
+
+ assert key != null;
+
+ Long rowId = keyToId.get(key);
+
+ if (rowId != null) {
+ Long oldRowId = treeMap.remove(getEnvelope(idToRow.get(rowId), rowId));
+
+ assert rowId.equals(oldRowId);
+ }
+ else {
+ rowId = ++rowIds;
+
+ keyToId.put(key, rowId);
+ }
+
+ GridH2Row old = idToRow.put(rowId, row);
+
+ treeMap.put(getEnvelope(row, rowId), rowId);
+
+ if (old == null)
+ rowCnt++; // No replace.
+
+ return old;
+ }
+ finally {
+ l.unlock();
+ }
+ }
+
+ /**
+ * @param row Row.
+ * @param rowId Row id.
+ * @return Envelope.
+ */
+ private SpatialKey getEnvelope(SearchRow row, long rowId) {
+ Value v = row.getValue(columnIds[0]);
+ Geometry g = ((ValueGeometry) v.convertTo(Value.GEOMETRY)).getGeometry();
+ Envelope env = g.getEnvelopeInternal();
+ return new SpatialKey(rowId,
+ (float) env.getMinX(), (float) env.getMaxX(),
+ (float) env.getMinY(), (float) env.getMaxY());
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridH2Row remove(SearchRow row) {
+ Lock l = lock.writeLock();
+
+ l.lock();
+
+ try {
+ checkClosed();
+
+ Value key = row.getValue(keyCol);
+
+ assert key != null;
+
+ Long rowId = keyToId.remove(key);
+
+ assert rowId != null;
+
+ GridH2Row oldRow = idToRow.remove(rowId);
+
+ assert oldRow != null;
+
+ if (!treeMap.remove(getEnvelope(row, rowId), rowId))
+ throw DbException.throwInternalError("row not found");
+
+ rowCnt--;
+
+ return oldRow;
+ }
+ finally {
+ l.unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close(Session ses) {
+ Lock l = lock.writeLock();
+
+ l.lock();
+
+ try {
+ closed = true;
+
+ store.close();
+ }
+ finally {
+ l.unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected long getCostRangeIndex(int[] masks, long rowCnt, TableFilter filter, SortOrder sortOrder) {
+ rowCnt += Constants.COST_ROW_OFFSET;
+ long cost = rowCnt;
+ long rows = rowCnt;
+
+ if (masks == null)
+ return cost;
+
+ for (Column column : columns) {
+ int idx = column.getColumnId();
+ int mask = masks[idx];
+ if ((mask & IndexCondition.SPATIAL_INTERSECTS) != 0) {
+ cost = 3 + rows / 4;
+
+ break;
+ }
+ }
+
+ return cost;
+ }
+
+ /** {@inheritDoc} */
+ @Override public double getCost(Session ses, int[] masks, TableFilter filter, SortOrder sortOrder) {
+ return getCostRangeIndex(masks, rowCnt, filter, sortOrder);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Cursor find(Session ses, SearchRow first, SearchRow last) {
+ Lock l = lock.readLock();
+
+ l.lock();
+
+ try {
+ checkClosed();
+
+ return new GridH2Cursor(rowIterator(treeMap.keySet().iterator()));
+ }
+ finally {
+ l.unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean canGetFirstOrLast() {
+ return true;
+ }
+
+ /**
+ * @param i Spatial key iterator.
+ * @return Iterator over rows.
+ */
+ private Iterator<GridH2Row> rowIterator(Iterator<SpatialKey> i) {
+ if (!i.hasNext())
+ return Collections.emptyIterator();
+
+ List<GridH2Row> rows = new ArrayList<>();
+
+ do {
+ GridH2Row row = idToRow.get(i.next().getId());
+
+ assert row != null;
+
+ rows.add(row);
+ }
+ while (i.hasNext());
+
+ return filter(rows.iterator());
+ }
+
+ /** {@inheritDoc} */
+ @Override public Cursor findFirstOrLast(Session ses, boolean first) {
+ Lock l = lock.readLock();
+
+ l.lock();
+
+ try {
+ checkClosed();
+
+ if (!first)
+ throw DbException.throwInternalError("Spatial Index can only be fetch by ascending order");
+
+ Iterator<GridH2Row> iter = rowIterator(treeMap.keySet().iterator());
+
+ return new SingleRowCursor(iter.hasNext() ? iter.next() : null);
+ }
+ finally {
+ l.unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getRowCount(Session ses) {
+ return rowCnt;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getRowCountApproximation() {
+ return rowCnt;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Cursor findByGeometry(TableFilter filter, SearchRow intersection) {
+ Lock l = lock.readLock();
+
+ l.lock();
+
+ try {
+ if (intersection == null)
+ return find(filter.getSession(), null, null);
+
+ return new GridH2Cursor(rowIterator(treeMap.findIntersectingKeys(getEnvelope(intersection, 0))));
+ }
+ finally {
+ l.unlock();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/922a2bab/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/opt/GridH2Table.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/opt/GridH2Table.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/opt/GridH2Table.java
new file mode 100644
index 0000000..e1c91fc
--- /dev/null
+++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/opt/GridH2Table.java
@@ -0,0 +1,889 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.query.h2.opt;
+
+
+import org.gridgain.grid.*;
+import org.gridgain.grid.util.offheap.unsafe.*;
+import org.h2.api.*;
+import org.h2.command.ddl.*;
+import org.h2.engine.*;
+import org.h2.index.*;
+import org.h2.message.*;
+import org.h2.result.*;
+import org.h2.schema.*;
+import org.h2.table.*;
+import org.jdk8.backport.*;
+import org.jetbrains.annotations.*;
+
+import java.sql.*;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.locks.*;
+
+/**
+ * H2 Table implementation.
+ */
+public class GridH2Table extends TableBase {
+ /** */
+ private final String spaceName;
+
+ /** */
+ private final GridH2RowDescriptor desc;
+
+ /** */
+ private final ArrayList<Index> idxs;
+
+ /** */
+ private final ReadWriteLock lock;
+
+ /** */
+ private final Set<Session> sessions = Collections.newSetFromMap(new ConcurrentHashMap8<Session, Boolean>());
+
+ /** */
+ private volatile Object[] actualSnapshot;
+
+ /**
+ * Creates table.
+ *
+ * @param createTblData Table description.
+ * @param desc Row descriptor.
+ * @param idxsFactory Indexes factory.
+ * @param spaceName Space name.
+ */
+ public GridH2Table(CreateTableData createTblData, @Nullable GridH2RowDescriptor desc, IndexesFactory idxsFactory,
+ @Nullable String spaceName) {
+ super(createTblData);
+
+ assert idxsFactory != null;
+
+ this.desc = desc;
+ this.spaceName = spaceName;
+
+ idxs = idxsFactory.createIndexes(this);
+
+ assert idxs != null;
+ assert idxs.size() >= 1;
+
+ lock = new ReentrantReadWriteLock();
+
+ // Add scan index at 0 which is required by H2.
+ idxs.add(0, new ScanIndex(index(0)));
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getDiskSpaceUsed() {
+ return 0;
+ }
+
+ /**
+ * @return Row descriptor.
+ */
+ public GridH2RowDescriptor rowDescriptor() {
+ return desc;
+ }
+
+ /**
+ * Should be called when entry is swapped.
+ *
+ * @param key Entry key.
+ * @return {@code true} If row was found.
+ * @throws GridException If failed.
+ */
+ public boolean onSwap(Object key) throws GridException {
+ return onSwapUnswap(key, null);
+ }
+
+ /**
+ * Should be called when entry is unswapped.
+ *
+ * @param key Key.
+ * @param val Value.
+ * @return {@code true} If row was found.
+ * @throws GridException If failed.
+ */
+ public boolean onUnswap(Object key, Object val) throws GridException {
+ assert val != null : "Key=" + key;
+
+ return onSwapUnswap(key, val);
+ }
+
+ /**
+ * Swaps or unswaps row.
+ *
+ * @param key Key.
+ * @param val Value for promote or {@code null} if we have to swap.
+ * @return {@code true} if row was found and swapped/unswapped.
+ * @throws GridException If failed.
+ */
+ @SuppressWarnings("LockAcquiredButNotSafelyReleased")
+ private boolean onSwapUnswap(Object key, @Nullable Object val) throws GridException {
+ assert key != null;
+
+ GridH2TreeIndex pk = pk();
+
+ assert desc != null;
+
+ GridH2AbstractKeyValueRow row = desc.createRow(key, null, 0); // Create search row.
+
+ GridUnsafeMemory mem = desc.memory();
+
+ lock.readLock().lock();
+
+ if (mem != null)
+ desc.guard().begin();
+
+ try {
+ row = pk.findOne(row);
+
+ if (row == null)
+ return false;
+
+ if (val == null)
+ row.onSwap();
+ else
+ row.onUnswap(val);
+
+ return true;
+ }
+ finally {
+ lock.readLock().unlock();
+
+ if (mem != null)
+ desc.guard().end();
+ }
+ }
+
+ /**
+ * @return Space name.
+ */
+ @Nullable String spaceName() {
+ return spaceName;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings({"LockAcquiredButNotSafelyReleased", "SynchronizationOnLocalVariableOrMethodParameter", "unchecked"})
+ @Override public void lock(@Nullable final Session ses, boolean exclusive, boolean force) {
+ if (ses != null) {
+ if (!sessions.add(ses))
+ return;
+
+ synchronized (ses) {
+ ses.addLock(this);
+ }
+ }
+
+ Object[] snapshot;
+
+ for (long waitTime = 100;; waitTime *= 2) { // Increase wait time to avoid starvation.
+ snapshot = actualSnapshot;
+
+ if (snapshot != null) {
+ // Reuse existing snapshot without locking.
+ for (int i = 1, len = idxs.size(); i < len; i++)
+ index(i).takeSnapshot(snapshot[i - 1]);
+
+ return;
+ }
+
+ try {
+ if (lock.writeLock().tryLock(waitTime, TimeUnit.MILLISECONDS))
+ break;
+ }
+ catch (InterruptedException e) {
+ throw new GridRuntimeException("Thread got interrupted while trying to acquire index lock.", e);
+ }
+ }
+
+ boolean snapshoted = false;
+
+ try {
+ snapshot = actualSnapshot; // Try again inside of the lock.
+
+ if (snapshot == null) {
+ snapshot = takeIndexesSnapshot();
+
+ if (desc == null || desc.memory() == null) // This optimization is disabled for off-heap index.
+ actualSnapshot = snapshot;
+
+ snapshoted = true;
+ }
+ }
+ finally {
+ lock.writeLock().unlock();
+ }
+
+ if (!snapshoted) {
+ for (int i = 1, len = idxs.size(); i < len; i++)
+ index(i).takeSnapshot(snapshot[i - 1]);
+ }
+ }
+
+ /**
+ * Must be called inside of write lock because when using multiple indexes we have to ensure that all of them have
+ * the same contents at snapshot taking time.
+ *
+ * @return New indexes data snapshot.
+ */
+ @SuppressWarnings("unchecked")
+ private Object[] takeIndexesSnapshot() {
+ int len = idxs.size();
+
+ Object[] snapshot = new ConcurrentNavigableMap[len - 1];
+
+ for (int i = 1; i < len; i++) { // Take snapshots on all except first which is scan.
+ Object s = index(i).takeSnapshot(null);
+
+ snapshot[i - 1] = s;
+ }
+
+ return snapshot;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close(Session ses) {
+ assert !sessions.contains(ses);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void unlock(@Nullable Session ses) {
+ if (ses != null) {
+ boolean res = sessions.remove(ses);
+
+ assert res;
+ }
+
+ for (int i = 1, len = idxs.size(); i < len; i++) // Release snapshots on all except first which is scan.
+ index(i).releaseSnapshot();
+ }
+
+ /**
+ * Closes table and releases resources.
+ */
+ public void close() {
+ Lock l = lock.writeLock();
+
+ l.lock();
+
+ try {
+ for (int i = 1, len = idxs.size(); i < len; i++)
+ index(i).close(null);
+ }
+ finally {
+ l.unlock();
+ }
+ }
+
+ /**
+ * Updates table for given key. If value is null then row with given key will be removed from table,
+ * otherwise value and expiration time will be updated or new row will be added.
+ *
+ * @param key Key.
+ * @param val Value.
+ * @param expirationTime Expiration time.
+ * @return {@code True} if operation succeeded.
+ * @throws GridException If failed.
+ */
+ public boolean update(Object key, @Nullable Object val, long expirationTime) throws GridException {
+ assert desc != null;
+
+ GridH2Row row = desc.createRow(key, val, expirationTime);
+
+ return doUpdate(row, val == null);
+ }
+
+ /**
+ * Gets index by index.
+ *
+ * @param idx Index in list.
+ * @return Index.
+ */
+ private GridH2IndexBase index(int idx) {
+ return (GridH2IndexBase)idxs.get(idx);
+ }
+
+ /**
+ * Gets primary key.
+ *
+ * @return Primary key.
+ */
+ private GridH2TreeIndex pk() {
+ return (GridH2TreeIndex)idxs.get(1);
+ }
+
+ /**
+ * For testing only.
+ *
+ * @param row Row.
+ * @param del If given row should be deleted from table.
+ * @return {@code True} if operation succeeded.
+ * @throws GridException If failed.
+ */
+ @SuppressWarnings("LockAcquiredButNotSafelyReleased")
+ boolean doUpdate(GridH2Row row, boolean del) throws GridException {
+ // Here we assume that each key can't be updated concurrently and case when different indexes
+ // getting updated from different threads with different rows with the same key is impossible.
+ GridUnsafeMemory mem = desc == null ? null : desc.memory();
+
+ lock.readLock().lock();
+
+ if (mem != null)
+ desc.guard().begin();
+
+ try {
+ GridH2TreeIndex pk = pk();
+
+ if (!del) {
+ GridH2Row old = pk.put(row); // Put to PK.
+
+ int len = idxs.size();
+
+ int i = 1;
+
+ // Put row if absent to all indexes sequentially.
+ // Start from 2 because 0 - Scan (don't need to update), 1 - PK (already updated).
+ while (++i < len) {
+ GridH2IndexBase idx = index(i);
+
+ assert !idx.getIndexType().isUnique() : "Unique indexes are not supported.";
+
+ GridH2Row old2 = idx.put(row);
+
+ if (old2 != null) { // Row was replaced in index.
+ if (!eq(pk, old2, old))
+ throw new IllegalStateException("Row conflict should never happen, unique indexes are " +
+ "not supported.");
+ }
+ else if (old != null) // Row was not replaced, need to remove manually.
+ idx.remove(old);
+ }
+ }
+ else {
+ // index(1) is PK, get full row from there (search row here contains only key but no other columns).
+ row = pk.remove(row);
+
+ if (row != null) {
+ // Remove row from all indexes.
+ // Start from 2 because 0 - Scan (don't need to update), 1 - PK (already updated).
+ for (int i = 2, len = idxs.size(); i < len; i++) {
+ Row res = index(i).remove(row);
+
+ assert eq(pk, res, row): "\n" + row + "\n" + res;
+ }
+ }
+ else
+ return false;
+ }
+
+ // The snapshot is not actual after update.
+ actualSnapshot = null;
+
+ return true;
+ }
+ finally {
+ lock.readLock().unlock();
+
+ if (mem != null)
+ desc.guard().end();
+ }
+ }
+
+ /**
+ * Check row equality.
+ *
+ * @param pk Primary key index.
+ * @param r1 First row.
+ * @param r2 Second row.
+ * @return {@code true} if rows are the same.
+ */
+ private static boolean eq(Index pk, SearchRow r1, SearchRow r2) {
+ return r1 == r2 || (r1 != null && r2 != null && pk.compareRows(r1, r2) == 0);
+ }
+
+ /**
+ * For testing only.
+ *
+ * @return Indexes.
+ */
+ ArrayList<GridH2IndexBase> indexes() {
+ ArrayList<GridH2IndexBase> res = new ArrayList<>(idxs.size() - 1);
+
+ for (int i = 1, len = idxs.size(); i < len ; i++)
+ res.add(index(i));
+
+ return res;
+ }
+
+ /**
+ * Rebuilds all indexes of this table.
+ */
+ public void rebuildIndexes() {
+ GridUnsafeMemory memory = desc == null ? null : desc.memory();
+
+ lock.writeLock().lock();
+
+ try {
+ if (memory == null && actualSnapshot == null)
+ actualSnapshot = takeIndexesSnapshot(); // Allow read access while we are rebuilding indexes.
+
+ for (int i = 1, len = idxs.size(); i < len; i++) {
+ GridH2IndexBase newIdx = index(i).rebuild();
+
+ idxs.set(i, newIdx);
+
+ if (i == 1) // ScanIndex at 0 and actualSnapshot can contain references to old indexes, reset them.
+ idxs.set(0, new ScanIndex(newIdx));
+ }
+ }
+ catch (InterruptedException ignored) {
+ // No-op.
+ }
+ finally {
+ lock.writeLock().unlock();
+
+ actualSnapshot = null;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public Index addIndex(Session ses, String s, int i, IndexColumn[] idxCols, IndexType idxType,
+ boolean b, String s1) {
+ throw DbException.getUnsupportedException("addIndex");
+ }
+
+ /** {@inheritDoc} */
+ @Override public void removeRow(Session ses, Row row) {
+ throw DbException.getUnsupportedException("removeRow");
+ }
+
+ /** {@inheritDoc} */
+ @Override public void truncate(Session ses) {
+ throw DbException.getUnsupportedException("truncate");
+ }
+
+ /** {@inheritDoc} */
+ @Override public void addRow(Session ses, Row row) {
+ throw DbException.getUnsupportedException("addRow");
+ }
+
+ /** {@inheritDoc} */
+ @Override public void checkSupportAlter() {
+ throw DbException.getUnsupportedException("alter");
+ }
+
+ /** {@inheritDoc} */
+ @Override public String getTableType() {
+ return EXTERNAL_TABLE_ENGINE;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Index getScanIndex(Session ses) {
+ return getIndexes().get(0); // Scan must be always first index.
+ }
+
+ /** {@inheritDoc} */
+ @Override public Index getUniqueIndex() {
+ return getIndexes().get(1); // PK index is always second.
+ }
+
+ /** {@inheritDoc} */
+ @Override public ArrayList<Index> getIndexes() {
+ return idxs;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isLockedExclusively() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isLockedExclusivelyBy(Session ses) {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getMaxDataModificationId() {
+ return 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isDeterministic() {
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean canGetRowCount() {
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean canDrop() {
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getRowCount(@Nullable Session ses) {
+ return getUniqueIndex().getRowCount(ses);
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getRowCountApproximation() {
+ return getUniqueIndex().getRowCountApproximation();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void checkRename() {
+ throw DbException.getUnsupportedException("rename");
+ }
+
+ /**
+ * Creates index column for table.
+ *
+ * @param col Column index.
+ * @param sorting Sorting order {@link SortOrder}
+ * @return Created index column.
+ */
+ public IndexColumn indexColumn(int col, int sorting) {
+ IndexColumn res = new IndexColumn();
+
+ res.column = getColumn(col);
+ res.columnName = res.column.getName();
+ res.sortType = sorting;
+
+ return res;
+ }
+
+ /**
+ * H2 Table engine.
+ */
+ @SuppressWarnings({"PublicInnerClass", "FieldAccessedSynchronizedAndUnsynchronized"})
+ public static class Engine implements TableEngine {
+ /** */
+ private static GridH2RowDescriptor rowDesc;
+
+ /** */
+ private static IndexesFactory idxsFactory;
+
+ /** */
+ private static GridH2Table resTbl;
+
+ /** */
+ private static String spaceName;
+
+ /** {@inheritDoc} */
+ @Override public TableBase createTable(CreateTableData createTblData) {
+ resTbl = new GridH2Table(createTblData, rowDesc, idxsFactory, spaceName);
+
+ return resTbl;
+ }
+
+ /**
+ * Creates table using given connection, DDL clause for given type descriptor and list of indexes.
+ *
+ * @param conn Connection.
+ * @param sql DDL clause.
+ * @param desc Row descriptor.
+ * @param factory Indexes factory.
+ * @param space Space name.
+ * @throws SQLException If failed.
+ * @return Created table.
+ */
+ public static synchronized GridH2Table createTable(Connection conn, String sql,
+ @Nullable GridH2RowDescriptor desc, IndexesFactory factory, String space)
+ throws SQLException {
+ rowDesc = desc;
+ idxsFactory = factory;
+ spaceName = space;
+
+ try {
+ try (Statement s = conn.createStatement()) {
+ s.execute(sql + " engine \"" + Engine.class.getName() + "\"");
+ }
+
+ return resTbl;
+ }
+ finally {
+ resTbl = null;
+ idxsFactory = null;
+ rowDesc = null;
+ }
+ }
+ }
+
+ /**
+ * Type which can create indexes list for given table.
+ */
+ @SuppressWarnings({"PackageVisibleInnerClass", "PublicInnerClass"})
+ public static interface IndexesFactory {
+ /**
+ * Create list of indexes. First must be primary key, after that all unique indexes and
+ * only then non-unique indexes.
+ * All indexes must be subtypes of {@link GridH2TreeIndex}.
+ *
+ * @param tbl Table to create indexes for.
+ * @return List of indexes.
+ */
+ ArrayList<Index> createIndexes(GridH2Table tbl);
+ }
+
+ /**
+ * Wrapper type for primary key.
+ */
+ @SuppressWarnings("PackageVisibleInnerClass")
+ static class ScanIndex implements Index {
+ /** */
+ static final String SCAN_INDEX_NAME_SUFFIX = "__SCAN_";
+
+ /** */
+ private static final IndexType TYPE = IndexType.createScan(false);
+
+ /** */
+ private final GridH2IndexBase delegate;
+
+ /**
+ * Constructor.
+ *
+ * @param delegate Index delegate to.
+ */
+ private ScanIndex(GridH2IndexBase delegate) {
+ this.delegate = delegate;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getDiskSpaceUsed() {
+ return 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void add(Session ses, Row row) {
+ delegate.add(ses, row);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean canFindNext() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean canGetFirstOrLast() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean canScan() {
+ return delegate.canScan();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close(Session ses) {
+ delegate.close(ses);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void commit(int operation, Row row) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public int compareRows(SearchRow rowData, SearchRow compare) {
+ return delegate.compareRows(rowData, compare);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Cursor find(TableFilter filter, SearchRow first, SearchRow last) {
+ return find(filter.getSession(), first, last);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Cursor find(Session ses, SearchRow first, SearchRow last) {
+ return delegate.find(ses, null, null);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Cursor findFirstOrLast(Session ses, boolean first) {
+ throw DbException.getUnsupportedException("SCAN");
+ }
+
+ /** {@inheritDoc} */
+ @Override public Cursor findNext(Session ses, SearchRow higherThan, SearchRow last) {
+ throw DbException.throwInternalError();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getColumnIndex(Column col) {
+ return -1;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Column[] getColumns() {
+ return delegate.getColumns();
+ }
+
+ /** {@inheritDoc} */
+ @Override public double getCost(Session ses, int[] masks, TableFilter tblFilter, SortOrder sortOrder) {
+ return getRowCountApproximation() + Constants.COST_ROW_OFFSET;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IndexColumn[] getIndexColumns() {
+ return delegate.getIndexColumns();
+ }
+
+ /** {@inheritDoc} */
+ @Override public IndexType getIndexType() {
+ return TYPE;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String getPlanSQL() {
+ return delegate.getTable().getSQL() + "." + SCAN_INDEX_NAME_SUFFIX;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Row getRow(Session ses, long key) {
+ return delegate.getRow(ses, key);
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getRowCount(Session ses) {
+ return delegate.getRowCount(ses);
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getRowCountApproximation() {
+ return delegate.getRowCountApproximation();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Table getTable() {
+ return delegate.getTable();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isRowIdIndex() {
+ return delegate.isRowIdIndex();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean needRebuild() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void remove(Session ses) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void remove(Session ses, Row row) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setSortedInsertMode(boolean sortedInsertMode) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void truncate(Session ses) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public Schema getSchema() {
+ return delegate.getSchema();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isHidden() {
+ return delegate.isHidden();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void checkRename() {
+ throw DbException.getUnsupportedException("rename");
+ }
+
+ /** {@inheritDoc} */
+ @Override public ArrayList<DbObject> getChildren() {
+ return delegate.getChildren();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String getComment() {
+ return delegate.getComment();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String getCreateSQL() {
+ return null; // Scan should return null.
+ }
+
+ /** {@inheritDoc} */
+ @Override public String getCreateSQLForCopy(Table tbl, String quotedName) {
+ return delegate.getCreateSQLForCopy(tbl, quotedName);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Database getDatabase() {
+ return delegate.getDatabase();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String getDropSQL() {
+ return delegate.getDropSQL();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getId() {
+ return delegate.getId();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String getName() {
+ return delegate.getName() + SCAN_INDEX_NAME_SUFFIX;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String getSQL() {
+ return delegate.getSQL();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getType() {
+ return delegate.getType();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isTemporary() {
+ return delegate.isTemporary();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void removeChildrenAndResources(Session ses) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void rename(String newName) {
+ throw DbException.getUnsupportedException("rename");
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setComment(String comment) {
+ throw DbException.getUnsupportedException("comment");
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setTemporary(boolean temporary) {
+ throw DbException.getUnsupportedException("temporary");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/922a2bab/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/opt/GridH2TreeIndex.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/opt/GridH2TreeIndex.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/opt/GridH2TreeIndex.java
new file mode 100644
index 0000000..e0ad5c2
--- /dev/null
+++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/opt/GridH2TreeIndex.java
@@ -0,0 +1,469 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.query.h2.opt;
+
+import org.apache.ignite.spi.indexing.*;
+import org.gridgain.grid.util.*;
+import org.gridgain.grid.util.snaptree.*;
+import org.gridgain.grid.util.typedef.internal.*;
+import org.gridgain.grid.util.offheap.unsafe.*;
+import org.h2.engine.*;
+import org.h2.index.*;
+import org.h2.result.*;
+import org.h2.table.*;
+import org.h2.value.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.*;
+
+/**
+ * Base class for snapshotable tree indexes.
+ */
+@SuppressWarnings("ComparatorNotSerializable")
+public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridSearchRowPointer> {
+ /** */
+ protected final ConcurrentNavigableMap<GridSearchRowPointer, GridH2Row> tree;
+
+ /** */
+ private final ThreadLocal<ConcurrentNavigableMap<GridSearchRowPointer, GridH2Row>> snapshot =
+ new ThreadLocal<>();
+
+ /**
+ * Constructor with index initialization.
+ *
+ * @param name Index name.
+ * @param tbl Table.
+ * @param pk If this index is primary key.
+ * @param keyCol Primary key column index.
+ * @param valCol Value column index.
+ * @param cols Index columns list.
+ */
+ @SuppressWarnings("unchecked")
+ public GridH2TreeIndex(String name, GridH2Table tbl, boolean pk, int keyCol, int valCol, IndexColumn... cols) {
+ super(keyCol, valCol);
+ if (!pk) {
+ // For other indexes we add primary key at the end to avoid conflicts.
+ cols = Arrays.copyOf(cols, cols.length + 1);
+
+ cols[cols.length - 1] = tbl.indexColumn(keyCol, SortOrder.ASCENDING);
+ }
+
+ IndexColumn.mapColumns(cols, tbl);
+
+ initBaseIndex(tbl, 0, name, cols,
+ pk ? IndexType.createUnique(false, false) : IndexType.createNonUnique(false, false, false));
+
+ final GridH2RowDescriptor desc = tbl.rowDescriptor();
+
+ tree = desc == null || desc.memory() == null ? new SnapTreeMap<GridSearchRowPointer, GridH2Row>(this) {
+ @Override protected void afterNodeUpdate_nl(Node<GridSearchRowPointer, GridH2Row> node, Object val) {
+ if (val != null)
+ node.key = (GridSearchRowPointer)val;
+ }
+
+ @Override protected Comparable<? super GridSearchRowPointer> comparable(Object key) {
+ if (key instanceof ComparableRow)
+ return (Comparable<? super SearchRow>)key;
+
+ return super.comparable(key);
+ }
+ } : new GridOffHeapSnapTreeMap<GridSearchRowPointer, GridH2Row>(desc, desc, desc.memory(), desc.guard(), this) {
+ @Override protected void afterNodeUpdate_nl(long node, GridH2Row val) {
+ final long oldKey = keyPtr(node);
+
+ if (val != null) {
+ key(node, val);
+
+ guard.finalizeLater(new Runnable() {
+ @Override public void run() {
+ desc.createPointer(oldKey).decrementRefCount();
+ }
+ });
+ }
+ }
+
+ @Override protected Comparable<? super GridSearchRowPointer> comparable(Object key) {
+ if (key instanceof ComparableRow)
+ return (Comparable<? super SearchRow>)key;
+
+ return super.comparable(key);
+ }
+ };
+ }
+
+ /**
+ * Closes index and releases resources.
+ */
+ public void close() {
+ if (tree instanceof Closeable)
+ U.closeQuiet((Closeable)tree);
+ }
+
+ /**
+ * Takes snapshot to be used in current thread. If argument is null it will be taken from current trees.
+ *
+ * @param s Map to be used as snapshot if not null.
+ * @return Taken snapshot or given argument back.
+ */
+ @SuppressWarnings("unchecked")
+ @Override public Object takeSnapshot(@Nullable Object s) {
+ assert snapshot.get() == null;
+
+ if (s == null)
+ s = tree instanceof SnapTreeMap ? ((SnapTreeMap)tree).clone() :
+ ((GridOffHeapSnapTreeMap)tree).clone();
+
+ snapshot.set((ConcurrentNavigableMap<GridSearchRowPointer, GridH2Row>)s);
+
+ return s;
+ }
+
+ /**
+ * Releases snapshot for current thread.
+ */
+ @Override public void releaseSnapshot() {
+ ConcurrentNavigableMap<GridSearchRowPointer, GridH2Row> s = snapshot.get();
+
+ snapshot.remove();
+
+ if (s instanceof Closeable)
+ U.closeQuiet((Closeable)s);
+ }
+
+ /**
+ * @return Snapshot for current thread if there is one.
+ */
+ private ConcurrentNavigableMap<GridSearchRowPointer, GridH2Row> treeForRead() {
+ ConcurrentNavigableMap<GridSearchRowPointer, GridH2Row> res = snapshot.get();
+
+ if (res == null)
+ res = tree;
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close(Session ses) {
+ assert snapshot.get() == null;
+
+ if (tree instanceof Closeable)
+ U.closeQuiet((Closeable)tree);
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getRowCount(@Nullable Session ses) {
+ GridIndexingQueryFilter f = filters.get();
+
+ // Fast path if we don't need to perform any filtering.
+ if (f == null || f.forSpace(((GridH2Table)getTable()).spaceName()) == null)
+ return treeForRead().size();
+
+ Iterator<GridH2Row> iter = doFind(null, false, null);
+
+ long size = 0;
+
+ while (iter.hasNext()) {
+ iter.next();
+
+ size++;
+ }
+
+ return size;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getRowCountApproximation() {
+ return tree.size();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int compare(GridSearchRowPointer r1, GridSearchRowPointer r2) {
+ // Second row here must be data row if first is a search row.
+ return -compareRows(r2, r1);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ SB sb = new SB((indexType.isUnique() ? "Unique index '" : "Index '") + getName() + "' [");
+
+ boolean first = true;
+
+ for (IndexColumn col : getIndexColumns()) {
+ if (first)
+ first = false;
+ else
+ sb.a(", ");
+
+ sb.a(col.getSQL());
+ }
+
+ sb.a(" ]");
+
+ return sb.toString();
+ }
+
+ /** {@inheritDoc} */
+ @Override public double getCost(Session ses, int[] masks, TableFilter filter, SortOrder sortOrder) {
+ return getCostRangeIndex(masks, getRowCountApproximation(), filter, sortOrder);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean canFindNext() {
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Cursor find(Session ses, @Nullable SearchRow first, @Nullable SearchRow last) {
+ return new GridH2Cursor(doFind(first, true, last));
+ }
+
+ /** {@inheritDoc} */
+ @Override public Cursor findNext(Session ses, SearchRow higherThan, SearchRow last) {
+ return new GridH2Cursor(doFind(higherThan, false, last));
+ }
+
+ /**
+ * Finds row with key equal one in given search row.
+ * WARNING!! Method call must be protected by {@link GridUnsafeGuard#begin()}
+ * {@link GridUnsafeGuard#end()} block.
+ *
+ * @param row Search row.
+ * @return Row.
+ */
+ public GridH2AbstractKeyValueRow findOne(GridSearchRowPointer row) {
+ return (GridH2AbstractKeyValueRow)tree.get(row);
+ }
+
+ /**
+ * Returns sub-tree bounded by given values.
+ *
+ * @param first Lower bound.
+ * @param includeFirst Whether lower bound should be inclusive.
+ * @param last Upper bound always inclusive.
+ * @return Iterator over rows in given range.
+ */
+ @SuppressWarnings("unchecked")
+ private Iterator<GridH2Row> doFind(@Nullable SearchRow first, boolean includeFirst,
+ @Nullable SearchRow last) {
+ ConcurrentNavigableMap<GridSearchRowPointer, GridH2Row> t = treeForRead();
+
+ includeFirst &= first != null;
+
+ NavigableMap<GridSearchRowPointer, GridH2Row> range = subTree(t, comparable(first, includeFirst ? -1 : 1),
+ comparable(last, 1));
+
+ if (range == null)
+ return new GridEmptyIterator<>();
+
+ return filter(range.values().iterator());
+ }
+
+ /**
+ * @param row Row.
+ * @param bias Bias.
+ * @return Comparable row.
+ */
+ private GridSearchRowPointer comparable(SearchRow row, int bias) {
+ if (row == null)
+ return null;
+
+ if (bias == 0 && row instanceof GridH2Row)
+ return (GridSearchRowPointer)row;
+
+ return new ComparableRow(row, bias);
+ }
+
+ /**
+ * Takes sup-map from given one.
+ *
+ * @param map Map.
+ * @param first Lower bound.
+ * @param last Upper bound.
+ * @return Sub-map.
+ */
+ @SuppressWarnings({"IfMayBeConditional", "TypeMayBeWeakened"})
+ private NavigableMap<GridSearchRowPointer, GridH2Row> subTree(NavigableMap<GridSearchRowPointer, GridH2Row> map,
+ @Nullable GridSearchRowPointer first, @Nullable GridSearchRowPointer last) {
+ // We take exclusive bounds because it is possible that one search row will be equal to multiple key rows
+ // in tree and we must return them all.
+ if (first == null) {
+ if (last == null)
+ return map;
+ else
+ return map.headMap(last, false);
+ }
+ else {
+ if (last == null)
+ return map.tailMap(first, false);
+ else {
+ if (compare(first, last) > 0)
+ return null;
+
+ return map.subMap(first, false, last, false);
+ }
+ }
+ }
+
+ /**
+ * Gets iterator over all rows in this index.
+ *
+ * @return Rows iterator.
+ */
+ Iterator<GridH2Row> rows() {
+ return doFind(null, false, null);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean canGetFirstOrLast() {
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Cursor findFirstOrLast(Session ses, boolean first) {
+ ConcurrentNavigableMap<GridSearchRowPointer, GridH2Row> tree = treeForRead();
+
+ Iterator<GridH2Row> iter = filter(first ? tree.values().iterator() : tree.descendingMap().values().iterator());
+
+ GridSearchRowPointer res = null;
+
+ if (iter.hasNext())
+ res = iter.next();
+
+ return new SingleRowCursor((Row)res);
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridH2Row put(GridH2Row row) {
+ return tree.put(row, row);
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridH2Row remove(SearchRow row) {
+ return tree.remove(comparable(row, 0));
+ }
+
+ /**
+ * Comparable row with bias. Will be used for queries to have correct bounds (in case of multicolumn index
+ * and query on few first columns we will multiple equal entries in tree).
+ */
+ private class ComparableRow implements GridSearchRowPointer, Comparable<SearchRow> {
+ /** */
+ private final SearchRow row;
+
+ /** */
+ private final int bias;
+
+ /**
+ * @param row Row.
+ * @param bias Bias.
+ */
+ private ComparableRow(SearchRow row, int bias) {
+ this.row = row;
+ this.bias = bias;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int compareTo(SearchRow o) {
+ int res = compareRows(o, row);
+
+ if (res == 0)
+ return bias;
+
+ return -res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object obj) {
+ throw new IllegalStateException("Should never be called.");
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getColumnCount() {
+ return row.getColumnCount();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Value getValue(int idx) {
+ return row.getValue(idx);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setValue(int idx, Value v) {
+ row.setValue(idx, v);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setKeyAndVersion(SearchRow old) {
+ row.setKeyAndVersion(old);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getVersion() {
+ return row.getVersion();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setKey(long key) {
+ row.setKey(key);
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getKey() {
+ return row.getKey();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getMemory() {
+ return row.getMemory();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long pointer() {
+ throw new IllegalStateException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void incrementRefCount() {
+ throw new IllegalStateException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void decrementRefCount() {
+ throw new IllegalStateException();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridH2TreeIndex rebuild() throws InterruptedException {
+ IndexColumn[] cols = getIndexColumns();
+
+ if (!getIndexType().isUnique())
+ cols = Arrays.copyOf(cols, cols.length - 1);
+
+ GridH2TreeIndex idx = new GridH2TreeIndex(getName(), (GridH2Table)getTable(), getIndexType().isUnique(),
+ keyCol, valCol, cols);
+
+ Thread thread = Thread.currentThread();
+
+ long i = 0;
+
+ for (GridH2Row row : tree.values()) {
+ // Check for interruptions every 1000 iterations.
+ if (++i % 1000 == 0 && thread.isInterrupted())
+ throw new InterruptedException();
+
+ idx.tree.put(row, row);
+ }
+
+ return idx;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/922a2bab/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/opt/GridH2Utils.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/opt/GridH2Utils.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/opt/GridH2Utils.java
new file mode 100644
index 0000000..661945f
--- /dev/null
+++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/opt/GridH2Utils.java
@@ -0,0 +1,125 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.query.h2.opt;
+
+import org.h2.value.*;
+
+import java.sql.*;
+import java.util.*;
+
+/**
+ *
+ */
+@SuppressWarnings({"JavaAbbreviationUsage", "GridBracket"})
+public class GridH2Utils {
+ /** Copy/pasted from org.h2.util.DateTimeUtils */
+ private static final int SHIFT_YEAR = 9;
+
+ /** Copy/pasted from org.h2.util.DateTimeUtils */
+ private static final int SHIFT_MONTH = 5;
+
+ /** Static calendar. */
+ private static final Calendar staticCalendar = Calendar.getInstance();
+
+ /** */
+ private static final ThreadLocal<Calendar> localCalendar = new ThreadLocal<>();
+
+ /**
+ * @return The instance of calendar for local thread.
+ */
+ public static Calendar getLocalCalendar() {
+ Calendar res = localCalendar.get();
+
+ if (res == null) {
+ res = (Calendar)staticCalendar.clone();
+
+ localCalendar.set(res);
+ }
+
+ return res;
+ }
+
+ /**
+ * Get or create a timestamp value for the given timestamp.
+ *
+ * Copy/pasted from org.h2.value.ValueTimestamp#get(java.sql.Timestamp)
+ *
+ * @param timestamp The timestamp.
+ * @return The value.
+ */
+ public static ValueTimestamp toValueTimestamp(Timestamp timestamp) {
+ long ms = timestamp.getTime();
+ long nanos = timestamp.getNanos() % 1000000;
+
+ Calendar calendar = getLocalCalendar();
+
+ calendar.clear();
+ calendar.setTimeInMillis(ms);
+
+ long dateValue = dateValueFromCalendar(calendar);
+
+ nanos += nanosFromCalendar(calendar);
+
+ return ValueTimestamp.fromDateValueAndNanos(dateValue, nanos);
+ }
+
+ /**
+ * Calculate the nanoseconds since midnight from a given calendar.
+ *
+ * Copy/pasted from org.h2.util.DateTimeUtils#nanosFromCalendar(java.util.Calendar).
+ *
+ * @param cal The calendar.
+ * @return Nanoseconds.
+ */
+ private static long nanosFromCalendar(Calendar cal) {
+ int h = cal.get(Calendar.HOUR_OF_DAY);
+ int m = cal.get(Calendar.MINUTE);
+ int s = cal.get(Calendar.SECOND);
+ int millis = cal.get(Calendar.MILLISECOND);
+
+ return ((((((h * 60L) + m) * 60) + s) * 1000) + millis) * 1000000;
+ }
+
+ /**
+ * Calculate the date value from a given calendar.
+ *
+ * Copy/pasted from org.h2.util.DateTimeUtils#dateValueFromCalendar(java.util.Calendar)
+ *
+ * @param cal The calendar.
+ * @return The date value.
+ */
+ private static long dateValueFromCalendar(Calendar cal) {
+ int year, month, day;
+
+ year = getYear(cal);
+ month = cal.get(Calendar.MONTH) + 1;
+ day = cal.get(Calendar.DAY_OF_MONTH);
+
+ return ((long) year << SHIFT_YEAR) | (month << SHIFT_MONTH) | day;
+ }
+
+ /**
+ * Get the year (positive or negative) from a calendar.
+ *
+ * Copy/pasted from org.h2.util.DateTimeUtils#getYear(java.util.Calendar)
+ *
+ * @param calendar The calendar.
+ * @return The year.
+ */
+ private static int getYear(Calendar calendar) {
+ int year = calendar.get(Calendar.YEAR);
+
+ if (calendar.get(Calendar.ERA) == GregorianCalendar.BC) {
+ year = 1 - year;
+ }
+
+ return year;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/922a2bab/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/opt/GridLuceneDirectory.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/opt/GridLuceneDirectory.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/opt/GridLuceneDirectory.java
new file mode 100644
index 0000000..6bd0f46
--- /dev/null
+++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/opt/GridLuceneDirectory.java
@@ -0,0 +1,189 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.query.h2.opt;
+
+import org.apache.lucene.store.*;
+import org.gridgain.grid.util.offheap.unsafe.*;
+
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+/**
+ * A memory-resident {@link Directory} implementation.
+ */
+public class GridLuceneDirectory extends Directory {
+ /** */
+ protected final Map<String, GridLuceneFile> fileMap = new ConcurrentHashMap<>();
+
+ /** */
+ protected final AtomicLong sizeInBytes = new AtomicLong();
+
+ /** */
+ private final GridUnsafeMemory mem;
+
+ /**
+ * Constructs an empty {@link Directory}.
+ *
+ * @param mem Memory.
+ */
+ public GridLuceneDirectory(GridUnsafeMemory mem) {
+ this.mem = mem;
+
+ try {
+ setLockFactory(new GridLuceneLockFactory());
+ }
+ catch (IOException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public final String[] listAll() {
+ ensureOpen();
+ // NOTE: fileMap.keySet().toArray(new String[0]) is broken in non Sun JDKs,
+ // and the code below is resilient to map changes during the array population.
+ Set<String> fileNames = fileMap.keySet();
+
+ List<String> names = new ArrayList<>(fileNames.size());
+
+ for (String name : fileNames)
+ names.add(name);
+
+ return names.toArray(new String[names.size()]);
+ }
+
+ /** {@inheritDoc} */
+ @Override public final boolean fileExists(String name) {
+ ensureOpen();
+
+ return fileMap.containsKey(name);
+ }
+
+ /** {@inheritDoc} */
+ @Override public final long fileModified(String name) {
+ ensureOpen();
+
+ throw new IllegalStateException(name);
+ }
+
+ /**
+ * Set the modified time of an existing file to now.
+ *
+ * @throws IOException if the file does not exist
+ */
+ @Override public void touchFile(String name) throws IOException {
+ ensureOpen();
+
+ throw new IllegalStateException(name);
+ }
+
+ /** {@inheritDoc} */
+ @Override public final long fileLength(String name) throws IOException {
+ ensureOpen();
+
+ GridLuceneFile file = fileMap.get(name);
+
+ if (file == null)
+ throw new FileNotFoundException(name);
+
+ return file.getLength();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void deleteFile(String name) throws IOException {
+ ensureOpen();
+
+ doDeleteFile(name);
+ }
+
+ /**
+ * Deletes file.
+ *
+ * @param name File name.
+ * @throws IOException If failed.
+ */
+ private void doDeleteFile(String name) throws IOException {
+ GridLuceneFile file = fileMap.remove(name);
+
+ if (file != null) {
+ file.delete();
+
+ sizeInBytes.addAndGet(-file.getSizeInBytes());
+ }
+ else
+ throw new FileNotFoundException(name);
+ }
+
+ /** {@inheritDoc} */
+ @Override public IndexOutput createOutput(String name) throws IOException {
+ ensureOpen();
+
+ GridLuceneFile file = newRAMFile();
+
+ GridLuceneFile existing = fileMap.remove(name);
+
+ if (existing != null) {
+ sizeInBytes.addAndGet(-existing.getSizeInBytes());
+
+ existing.delete();
+ }
+
+ fileMap.put(name, file);
+
+ return new GridLuceneOutputStream(file);
+ }
+
+ /**
+ * Returns a new {@link GridLuceneFile} for storing data. This method can be
+ * overridden to return different {@link GridLuceneFile} impls, that e.g. override.
+ *
+ * @return New ram file.
+ */
+ protected GridLuceneFile newRAMFile() {
+ return new GridLuceneFile(this);
+ }
+
+ /** {@inheritDoc} */
+ @Override public IndexInput openInput(String name) throws IOException {
+ ensureOpen();
+
+ GridLuceneFile file = fileMap.get(name);
+
+ if (file == null)
+ throw new FileNotFoundException(name);
+
+ return new GridLuceneInputStream(name, file);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() {
+ isOpen = false;
+
+ for (String fileName : fileMap.keySet()) {
+ try {
+ doDeleteFile(fileName);
+ }
+ catch (IOException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ assert fileMap.isEmpty();
+ }
+
+ /**
+ * @return Memory.
+ */
+ GridUnsafeMemory memory() {
+ return mem;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/922a2bab/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/opt/GridLuceneFile.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/opt/GridLuceneFile.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/opt/GridLuceneFile.java
new file mode 100644
index 0000000..6461822
--- /dev/null
+++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/opt/GridLuceneFile.java
@@ -0,0 +1,186 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.query.h2.opt;
+
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.atomic.*;
+
+import static org.gridgain.grid.kernal.processors.query.h2.opt.GridLuceneOutputStream.*;
+
+/**
+ * Lucene file.
+ */
+public class GridLuceneFile implements Serializable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ public static final AtomicInteger filesCnt = new AtomicInteger();
+
+ /** */
+ private LongArray buffers = new LongArray();
+
+ /** */
+ private long length;
+
+ /** */
+ private final GridLuceneDirectory dir;
+
+ /** */
+ private volatile long sizeInBytes;
+
+ /**
+ * File used as buffer, in no RAMDirectory
+ *
+ * @param dir Directory.
+ */
+ GridLuceneFile(GridLuceneDirectory dir) {
+ this.dir = dir;
+
+ filesCnt.incrementAndGet();
+ }
+
+ /**
+ * For non-stream access from thread that might be concurrent with writing
+ *
+ * @return Length.
+ */
+ public synchronized long getLength() {
+ return length;
+ }
+
+ /**
+ * Sets length.
+ *
+ * @param length Length.
+ */
+ protected synchronized void setLength(long length) {
+ this.length = length;
+ }
+
+ /**
+ * @return New buffer address.
+ */
+ final long addBuffer() {
+ long buf = newBuffer();
+
+ synchronized (this) {
+ buffers.add(buf);
+
+ sizeInBytes += BUFFER_SIZE;
+ }
+
+ if (dir != null)
+ dir.sizeInBytes.getAndAdd(BUFFER_SIZE);
+
+ return buf;
+ }
+
+ /**
+ * Gets address of buffer.
+ *
+ * @param idx Index.
+ * @return Pointer.
+ */
+ protected final synchronized long getBuffer(int idx) {
+ return buffers.get(idx);
+ }
+
+ /**
+ * @return Number of buffers.
+ */
+ protected final synchronized int numBuffers() {
+ return buffers.size();
+ }
+
+ /**
+ * Expert: allocate a new buffer.
+ * Subclasses can allocate differently.
+ *
+ * @return allocated buffer.
+ */
+ protected long newBuffer() {
+ return dir.memory().allocate(BUFFER_SIZE);
+ }
+
+ /**
+ * Deletes file and deallocates memory..
+ */
+ public synchronized void delete() {
+ if (buffers == null)
+ return;
+
+ for (int i = 0; i < buffers.idx; i++)
+ dir.memory().release(buffers.arr[i], BUFFER_SIZE);
+
+ buffers = null;
+
+ filesCnt.decrementAndGet();
+ }
+
+ /**
+ * @return Size in bytes.
+ */
+ public long getSizeInBytes() {
+ return sizeInBytes;
+ }
+
+ /**
+ * @return Directory.
+ */
+ public GridLuceneDirectory getDirectory() {
+ return dir;
+ }
+
+ /**
+ * Simple expandable long[] wrapper.
+ */
+ private static class LongArray {
+ /** */
+ private long[] arr = new long[128];
+
+ /** */
+ private int idx;
+
+ /**
+ * @return Size.
+ */
+ int size() {
+ return idx;
+ }
+
+ /**
+ * Gets value by index.
+ *
+ * @param idx Index.
+ * @return Value.
+ */
+ long get(int idx) {
+ assert idx < this.idx;
+
+ return arr[idx];
+ }
+
+ /**
+ * Adds value.
+ *
+ * @param val Value.
+ */
+ void add(long val) {
+ int len = arr.length;
+
+ if (idx == len)
+ arr = Arrays.copyOf(arr, Math.min(len * 2, len + 1024));
+
+ arr[idx++] = val;
+ }
+ }
+}