You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2017/11/21 09:26:36 UTC
ignite git commit: IGNITE-6931: SQL: simplify index rebuild. Now both
CREATE INDEX and "rebuild from hash" routines use the same iteration and
entry locking logic. This closes #3052.
Repository: ignite
Updated Branches:
refs/heads/master db343b649 -> 65080675d
IGNITE-6931: SQL: simplify index rebuild. Now both CREATE INDEX and "rebuild from hash" routines use the same iteration and entry locking logic. This closes #3052.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/65080675
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/65080675
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/65080675
Branch: refs/heads/master
Commit: 65080675dec8433c0968762c903905d1f65e5f29
Parents: db343b6
Author: Igor Seliverstov <gv...@gmail.com>
Authored: Tue Nov 21 12:26:23 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Tue Nov 21 12:26:23 2017 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheEntryEx.java | 13 +--
.../processors/cache/GridCacheMapEntry.java | 17 +--
.../cache/IgniteCacheOffheapManager.java | 19 ----
.../cache/IgniteCacheOffheapManagerImpl.java | 21 ----
.../persistence/GridCacheOffheapManager.java | 7 --
.../processors/query/GridQueryProcessor.java | 51 ++++++++-
.../query/schema/SchemaIndexCacheFilter.java | 33 ++++++
.../schema/SchemaIndexCacheVisitorImpl.java | 108 +++++++++----------
.../processors/cache/GridCacheTestEntryEx.java | 10 +-
.../processors/query/h2/IgniteH2Indexing.java | 64 ++++-------
10 files changed, 166 insertions(+), 177 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/65080675/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
index 6da7bc7..313a0c7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
@@ -34,6 +34,7 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEntryEx;
import org.apache.ignite.internal.processors.dr.GridDrType;
+import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheFilter;
import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitorClosure;
import org.apache.ignite.internal.util.lang.GridTuple3;
import org.jetbrains.annotations.Nullable;
@@ -869,12 +870,13 @@ public interface GridCacheEntryEx {
/**
* Update index from within entry lock, passing key, value, and expiration time to provided closure.
*
+ * @param filter Row filter.
* @param clo Closure to apply to key, value, and expiration time.
* @throws IgniteCheckedException If failed.
* @throws GridCacheEntryRemovedException If entry was removed.
*/
- public void updateIndex(SchemaIndexCacheVisitorClosure clo) throws IgniteCheckedException,
- GridCacheEntryRemovedException;
+ public void updateIndex(SchemaIndexCacheFilter filter, SchemaIndexCacheVisitorClosure clo)
+ throws IgniteCheckedException, GridCacheEntryRemovedException;
/**
* @return Expire time, without accounting for transactions or removals.
@@ -919,13 +921,6 @@ public interface GridCacheEntryEx {
public void updateTtl(@Nullable GridCacheVersion ver, long ttl) throws GridCacheEntryRemovedException;
/**
- * Ensures that the value stored in the entry is also inserted in the indexing.
- *
- * @throws GridCacheEntryRemovedException If entry was removed.
- */
- public void ensureIndexed() throws GridCacheEntryRemovedException, IgniteCheckedException;
-
- /**
* @return Value.
* @throws IgniteCheckedException If failed to read from swap storage.
* @throws GridCacheEntryRemovedException If entry was removed.
http://git-wip-us.apache.org/repos/asf/ignite/blob/65080675/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 778a46e..79390ed 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -60,6 +60,7 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConfl
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEntryEx;
import org.apache.ignite.internal.processors.dr.GridDrType;
+import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheFilter;
import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitorClosure;
import org.apache.ignite.internal.util.IgniteTree;
import org.apache.ignite.internal.util.lang.GridClosureException;
@@ -3141,16 +3142,6 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
}
/** {@inheritDoc} */
- @Override public void ensureIndexed() throws GridCacheEntryRemovedException, IgniteCheckedException {
- synchronized (this) {
- checkObsolete();
-
- if (cctx.queries().enabled())
- cctx.offheap().updateIndexes(cctx, key, localPartition());
- }
- }
-
- /** {@inheritDoc} */
@Override public synchronized CacheObject valueBytes() throws GridCacheEntryRemovedException {
checkObsolete();
@@ -3300,8 +3291,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
}
/** {@inheritDoc} */
- @Override public void updateIndex(SchemaIndexCacheVisitorClosure clo) throws IgniteCheckedException,
- GridCacheEntryRemovedException {
+ @Override public void updateIndex(SchemaIndexCacheFilter filter, SchemaIndexCacheVisitorClosure clo)
+ throws IgniteCheckedException, GridCacheEntryRemovedException {
synchronized (this) {
if (isInternal())
return;
@@ -3310,7 +3301,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
CacheDataRow row = cctx.offheap().read(this);
- if (row != null)
+ if (row != null && (filter == null || filter.apply(row)))
clo.apply(row);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/65080675/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
index 761b787..84c69a9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
@@ -186,18 +186,6 @@ public interface IgniteCacheOffheapManager {
/**
* @param cctx Cache context.
* @param key Key.
- * @param part Partition.
- * @throws IgniteCheckedException If failed.
- */
- public void updateIndexes(
- GridCacheContext cctx,
- KeyCacheObject key,
- GridDhtLocalPartition part
- ) throws IgniteCheckedException;
-
- /**
- * @param cctx Cache context.
- * @param key Key.
* @param partId Partition number.
* @param part Partition.
* @throws IgniteCheckedException If failed.
@@ -454,13 +442,6 @@ public interface IgniteCacheOffheapManager {
/**
* @param cctx Cache context.
* @param key Key.
- * @throws IgniteCheckedException If failed.
- */
- void updateIndexes(GridCacheContext cctx, KeyCacheObject key) throws IgniteCheckedException;
-
- /**
- * @param cctx Cache context.
- * @param key Key.
* @param c Closure.
* @throws IgniteCheckedException If failed.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/65080675/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index c85ba1d..370a92e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -359,12 +359,6 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
}
/** {@inheritDoc} */
- @Override public void updateIndexes(GridCacheContext cctx, KeyCacheObject key, GridDhtLocalPartition part)
- throws IgniteCheckedException {
- dataStore(part).updateIndexes(cctx, key);
- }
-
- /** {@inheritDoc} */
@Override public void remove(
GridCacheContext cctx,
KeyCacheObject key,
@@ -1362,21 +1356,6 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
}
/** {@inheritDoc} */
- @Override public void updateIndexes(GridCacheContext cctx, KeyCacheObject key) throws IgniteCheckedException {
- int cacheId = grp.sharedGroup() ? cctx.cacheId() : CU.UNDEFINED_CACHE_ID;
-
- CacheDataRow row = dataTree.findOne(new SearchRow(cacheId, key), CacheDataRowAdapter.RowData.NO_KEY);
-
- if (row != null) {
- row.key(key);
-
- GridCacheQueryManager qryMgr = cctx.queries();
-
- qryMgr.store(row, null, false);
- }
- }
-
- /** {@inheritDoc} */
@Override public void remove(GridCacheContext cctx, KeyCacheObject key, int partId) throws IgniteCheckedException {
if (!busyLock.enterBusy())
throw new NodeStoppingException("Operation has been cancelled (node is stopping).");
http://git-wip-us.apache.org/repos/asf/ignite/blob/65080675/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
index 6ed62f8..cfa1829 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
@@ -1236,13 +1236,6 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
}
/** {@inheritDoc} */
- @Override public void updateIndexes(GridCacheContext cctx, KeyCacheObject key) throws IgniteCheckedException {
- CacheDataStore delegate = init0(false);
-
- delegate.updateIndexes(cctx, key);
- }
-
- /** {@inheritDoc} */
@Override public CacheDataRow createRow(
GridCacheContext cctx,
KeyCacheObject key,
http://git-wip-us.apache.org/repos/asf/ignite/blob/65080675/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index ed5fdd9..471888a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -73,6 +73,7 @@ import org.apache.ignite.internal.processors.cache.query.CacheQueryType;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
import org.apache.ignite.internal.processors.query.property.QueryBinaryProperty;
+import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheFilter;
import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitor;
import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitorImpl;
import org.apache.ignite.internal.processors.query.schema.SchemaIndexOperationCancellationToken;
@@ -96,10 +97,12 @@ import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.lang.GridCloseableIterator;
import org.apache.ignite.internal.util.lang.GridClosureException;
import org.apache.ignite.internal.util.lang.IgniteOutClosureX;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.T3;
import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.internal.util.worker.GridWorkerFuture;
@@ -1319,12 +1322,15 @@ public class GridQueryProcessor extends GridProcessorAdapter {
try {
if (op instanceof SchemaIndexCreateOperation) {
- SchemaIndexCreateOperation op0 = (SchemaIndexCreateOperation)op;
+ final SchemaIndexCreateOperation op0 = (SchemaIndexCreateOperation)op;
QueryIndexDescriptorImpl idxDesc = QueryUtils.createIndexDescriptor(type, op0.index());
- SchemaIndexCacheVisitor visitor =
- new SchemaIndexCacheVisitorImpl(this, cache.context(), cacheName, op0.tableName(), cancelTok);
+ GridCacheContext cctx = cache.context();
+
+ SchemaIndexCacheFilter filter = new TableCacheFilter(cctx, op0.tableName());
+
+ SchemaIndexCacheVisitor visitor = new SchemaIndexCacheVisitorImpl(cctx, filter, cancelTok);
idx.dynamicIndexCreate(op0.schemaName(), op0.tableName(), idxDesc, op0.ifNotExists(), visitor);
}
@@ -2869,4 +2875,43 @@ public class GridQueryProcessor extends GridProcessorAdapter {
this.mgr = mgr;
}
}
+
+ /** */
+ private static class TableCacheFilter implements SchemaIndexCacheFilter {
+ /** */
+ @GridToStringExclude
+ private final GridCacheContext cctx;
+
+ /** */
+ @GridToStringExclude
+ private final GridQueryProcessor query;
+
+ /** */
+ private final String cacheName;
+
+ /** */
+ private final String tableName;
+
+ /**
+ * @param cctx Cache context.
+ * @param tableName Target table name.
+ */
+ TableCacheFilter(GridCacheContext cctx, String tableName) {
+ this.cctx = cctx;
+ this.tableName = tableName;
+
+ cacheName = cctx.name();
+ query = cctx.kernalContext().query();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean apply(CacheDataRow row) throws IgniteCheckedException {
+ return query.belongsToTable(cctx, cacheName, tableName, row.key(), row.value());
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(TableCacheFilter.class, this);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/65080675/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheFilter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheFilter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheFilter.java
new file mode 100644
index 0000000..32600c6
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheFilter.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.schema;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
+
+/**
+ * Index row filter accepting current entry.
+ */
+public interface SchemaIndexCacheFilter {
+ /**
+ * @param row Cache data row.
+ * @return {@code True} if row passes the filter.
+ * @throws IgniteCheckedException If failed.
+ */
+ boolean apply(CacheDataRow row) throws IgniteCheckedException;
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/65080675/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitorImpl.java
index fda7d1d..c11c614 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitorImpl.java
@@ -17,7 +17,7 @@
package org.apache.ignite.internal.processors.query.schema;
-import java.util.Collection;
+import java.util.List;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
@@ -29,7 +29,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalP
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter;
-import org.apache.ignite.internal.processors.query.GridQueryProcessor;
import org.apache.ignite.internal.util.lang.GridCursor;
import org.apache.ignite.internal.util.typedef.internal.S;
@@ -40,35 +39,36 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
/**
* Traversor operating all primary and backup partitions of given cache.
*/
+@SuppressWarnings("ForLoopReplaceableByForEach")
public class SchemaIndexCacheVisitorImpl implements SchemaIndexCacheVisitor {
- /** Query procssor. */
- private final GridQueryProcessor qryProc;
+ /** Count of rows, being processed within a single checkpoint lock. */
+ private static final int BATCH_SIZE = 1000;
/** Cache context. */
private final GridCacheContext cctx;
- /** Cache name. */
- private final String cacheName;
-
- /** Table name. */
- private final String tblName;
+ /** Row filter. */
+ private final SchemaIndexCacheFilter rowFilter;
/** Cancellation token. */
private final SchemaIndexOperationCancellationToken cancel;
/**
* Constructor.
- *
- * @param cctx Cache context.
- * @param cacheName Cache name.
- * @param tblName Table name.
+ * @param cctx Cache context.
+ */
+ public SchemaIndexCacheVisitorImpl(GridCacheContext cctx) {
+ this(cctx, null, null);
+ }
+
+ /**
+ * Constructor.
+ * @param cctx Cache context.
* @param cancel Cancellation token.
*/
- public SchemaIndexCacheVisitorImpl(GridQueryProcessor qryProc, GridCacheContext cctx, String cacheName,
- String tblName, SchemaIndexOperationCancellationToken cancel) {
- this.qryProc = qryProc;
- this.cacheName = cacheName;
- this.tblName = tblName;
+ public SchemaIndexCacheVisitorImpl(GridCacheContext cctx, SchemaIndexCacheFilter rowFilter,
+ SchemaIndexOperationCancellationToken cancel) {
+ this.rowFilter = rowFilter;
this.cancel = cancel;
if (cctx.isNear())
@@ -81,12 +81,10 @@ public class SchemaIndexCacheVisitorImpl implements SchemaIndexCacheVisitor {
@Override public void visit(SchemaIndexCacheVisitorClosure clo) throws IgniteCheckedException {
assert clo != null;
- FilteringVisitorClosure filterClo = new FilteringVisitorClosure(clo);
-
- Collection<GridDhtLocalPartition> parts = cctx.topology().localPartitions();
+ List<GridDhtLocalPartition> parts = cctx.topology().localPartitions();
- for (GridDhtLocalPartition part : parts)
- processPartition(part, filterClo);
+ for (int i = 0, size = parts.size(); i < size; i++)
+ processPartition(parts.get(i), clo);
}
/**
@@ -96,7 +94,7 @@ public class SchemaIndexCacheVisitorImpl implements SchemaIndexCacheVisitor {
* @param clo Index closure.
* @throws IgniteCheckedException If failed.
*/
- private void processPartition(GridDhtLocalPartition part, FilteringVisitorClosure clo)
+ private void processPartition(GridDhtLocalPartition part, SchemaIndexCacheVisitorClosure clo)
throws IgniteCheckedException {
checkCancelled();
@@ -114,15 +112,35 @@ public class SchemaIndexCacheVisitorImpl implements SchemaIndexCacheVisitor {
null,
CacheDataRowAdapter.RowData.KEY_ONLY);
- while (cursor.next()) {
- CacheDataRow row = cursor.get();
+ boolean locked = false;
+
+ try {
+ int cntr = 0;
+
+ while (cursor.next()) {
+ KeyCacheObject key = cursor.get().key();
+
+ if (!locked) {
+ cctx.shared().database().checkpointReadLock();
- KeyCacheObject key = row.key();
+ locked = true;
+ }
- processKey(key, clo);
+ processKey(key, clo);
- if (part.state() == RENTING)
- break;
+ if (++cntr % BATCH_SIZE == 0) {
+ cctx.shared().database().checkpointReadUnlock();
+
+ locked = false;
+ }
+
+ if (part.state() == RENTING)
+ break;
+ }
+ }
+ finally {
+ if (locked)
+ cctx.shared().database().checkpointReadUnlock();
}
}
finally {
@@ -137,7 +155,7 @@ public class SchemaIndexCacheVisitorImpl implements SchemaIndexCacheVisitor {
* @param clo Closure.
* @throws IgniteCheckedException If failed.
*/
- private void processKey(KeyCacheObject key, FilteringVisitorClosure clo) throws IgniteCheckedException {
+ private void processKey(KeyCacheObject key, SchemaIndexCacheVisitorClosure clo) throws IgniteCheckedException {
while (true) {
try {
checkCancelled();
@@ -145,7 +163,7 @@ public class SchemaIndexCacheVisitorImpl implements SchemaIndexCacheVisitor {
GridCacheEntryEx entry = cctx.cache().entryEx(key);
try {
- entry.updateIndex(clo);
+ entry.updateIndex(rowFilter, clo);
}
finally {
cctx.evicts().touch(entry, AffinityTopologyVersion.NONE);
@@ -168,7 +186,7 @@ public class SchemaIndexCacheVisitorImpl implements SchemaIndexCacheVisitor {
* @throws IgniteCheckedException If cancelled.
*/
private void checkCancelled() throws IgniteCheckedException {
- if (cancel.isCancelled())
+ if (cancel != null && cancel.isCancelled())
throw new IgniteCheckedException("Index creation was cancelled.");
}
@@ -176,28 +194,4 @@ public class SchemaIndexCacheVisitorImpl implements SchemaIndexCacheVisitor {
@Override public String toString() {
return S.toString(SchemaIndexCacheVisitorImpl.class, this);
}
-
- /**
- * Filtering visitor closure.
- */
- private class FilteringVisitorClosure implements SchemaIndexCacheVisitorClosure {
-
- /** Target closure. */
- private final SchemaIndexCacheVisitorClosure target;
-
- /**
- * Constructor.
- *
- * @param target Target.
- */
- FilteringVisitorClosure(SchemaIndexCacheVisitorClosure target) {
- this.target = target;
- }
-
- /** {@inheritDoc} */
- @Override public void apply(CacheDataRow row) throws IgniteCheckedException {
- if (qryProc.belongsToTable(cctx, cacheName, tblName, row.key(), row.value()))
- target.apply(row);
- }
- }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/65080675/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
index 2ba8fd8..3d7edac 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
@@ -33,6 +33,7 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEntryEx;
import org.apache.ignite.internal.processors.dr.GridDrType;
+import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheFilter;
import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitorClosure;
import org.apache.ignite.internal.util.lang.GridMetadataAwareAdapter;
import org.apache.ignite.internal.util.lang.GridTuple3;
@@ -817,11 +818,6 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr
}
/** {@inheritDoc} */
- @Override public void ensureIndexed() throws GridCacheEntryRemovedException, IgniteCheckedException {
- // No-op.
- }
-
- /** {@inheritDoc} */
@Override public CacheObject unswap() throws IgniteCheckedException {
return null;
}
@@ -842,8 +838,8 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr
}
/** {@inheritDoc} */
- @Override public void updateIndex(SchemaIndexCacheVisitorClosure clo) throws IgniteCheckedException,
- GridCacheEntryRemovedException {
+ @Override public void updateIndex(SchemaIndexCacheFilter filter, SchemaIndexCacheVisitorClosure clo)
+ throws IgniteCheckedException, GridCacheEntryRemovedException {
// No-op.
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/65080675/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index c37e5f0..f3a95a5 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -65,17 +65,13 @@ import org.apache.ignite.internal.processors.cache.CacheEntryImpl;
import org.apache.ignite.internal.processors.cache.CacheObjectUtils;
import org.apache.ignite.internal.processors.cache.CacheObjectValueContext;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
-import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
-import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManager;
-import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
import org.apache.ignite.internal.processors.cache.query.CacheQueryPartitionInfo;
+import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable;
import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery;
import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
@@ -116,6 +112,7 @@ import org.apache.ignite.internal.processors.query.h2.twostep.GridReduceQueryExe
import org.apache.ignite.internal.processors.query.h2.twostep.MapQueryLazyWorker;
import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitor;
import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitorClosure;
+import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitorImpl;
import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
import org.apache.ignite.internal.sql.SqlParseException;
import org.apache.ignite.internal.sql.SqlParser;
@@ -2021,44 +2018,11 @@ public class IgniteH2Indexing implements GridQueryIndexing {
GridCacheContext cctx = ctx.cache().context().cacheContext(cacheId);
- IgniteCacheOffheapManager offheapMgr = cctx.isNear() ? cctx.near().dht().context().offheap() : cctx.offheap();
+ final GridCacheQueryManager qryMgr = cctx.queries();
- for (int p = 0; p < cctx.affinity().partitions(); p++) {
- try (GridCloseableIterator<KeyCacheObject> keyIter = offheapMgr.cacheKeysIterator(cctx.cacheId(), p)) {
- while (keyIter.hasNext()) {
- cctx.shared().database().checkpointReadLock();
+ SchemaIndexCacheVisitor visitor = new SchemaIndexCacheVisitorImpl(cctx);
- try {
- KeyCacheObject key = keyIter.next();
-
- while (true) {
- GridCacheEntryEx entry = null;
-
- try {
- entry = cctx.isNear() ?
- cctx.near().dht().entryEx(key) : cctx.cache().entryEx(key);
-
- entry.ensureIndexed();
-
- break;
- }
- catch (GridCacheEntryRemovedException ignore) {
- // Retry.
- }
- catch (GridDhtInvalidPartitionException ignore) {
- break;
- }
- finally {
- entry.context().evicts().touch(entry, AffinityTopologyVersion.NONE);
- }
- }
- }
- finally {
- cctx.shared().database().checkpointReadUnlock();
- }
- }
- }
- }
+ visitor.visit(new RebuldIndexFromHashClosure(qryMgr));
for (H2TableDescriptor tblDesc : tables(cacheName))
tblDesc.table().markRebuildFromHashInProgress(false);
@@ -2653,4 +2617,22 @@ public class IgniteH2Indexing implements GridQueryIndexing {
private interface ClIter<X> extends AutoCloseable, Iterator<X> {
// No-op.
}
+
+ /** */
+ private static class RebuldIndexFromHashClosure implements SchemaIndexCacheVisitorClosure {
+ /** */
+ private final GridCacheQueryManager qryMgr;
+
+ /**
+ * @param qryMgr Query manager.
+ */
+ RebuldIndexFromHashClosure(GridCacheQueryManager qryMgr) {
+ this.qryMgr = qryMgr;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void apply(CacheDataRow row) throws IgniteCheckedException {
+ qryMgr.store(row, null, false);
+ }
+ }
}