You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2015/06/12 13:17:01 UTC
[06/24] incubator-ignite git commit: ignite-sprint-6: merge from
ignite-sprint-5
ignite-sprint-6: merge from ignite-sprint-5
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/a5d007e3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/a5d007e3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/a5d007e3
Branch: refs/heads/ignite-gg-10411
Commit: a5d007e323af2d6619c11fb698992c8020d5eefa
Parents: d4b9731
Author: Denis Magda <dm...@gridgain.com>
Authored: Thu Jun 11 12:34:49 2015 +0300
Committer: Denis Magda <dm...@gridgain.com>
Committed: Thu Jun 11 12:34:49 2015 +0300
----------------------------------------------------------------------
DEVNOTES.txt | 6 +
assembly/dependencies-fabric.xml | 1 +
examples/pom.xml | 34 ++
modules/core/pom.xml | 1 -
.../apache/ignite/IgniteSystemProperties.java | 3 +
.../apache/ignite/cache/query/ScanQuery.java | 45 +-
.../configuration/CacheConfiguration.java | 1 -
.../affinity/GridAffinityAssignmentCache.java | 5 +-
.../processors/cache/GridCacheAdapter.java | 15 +-
.../GridCachePartitionExchangeManager.java | 2 +-
.../processors/cache/GridCacheProcessor.java | 30 +-
.../processors/cache/GridCacheSwapManager.java | 55 ++-
.../processors/cache/IgniteCacheProxy.java | 11 +-
.../processors/cache/QueryCursorImpl.java | 23 +-
.../distributed/dht/GridDhtLocalPartition.java | 7 +
.../processors/cache/query/CacheQuery.java | 2 +-
.../query/GridCacheDistributedQueryManager.java | 3 +
.../cache/query/GridCacheQueryAdapter.java | 147 ++++++-
.../cache/query/GridCacheQueryManager.java | 209 ++++++----
.../cache/query/GridCacheQueryRequest.java | 47 ++-
.../processors/cache/query/QueryCursorEx.java | 8 +
.../datastructures/GridCacheSetImpl.java | 4 +-
.../processors/query/GridQueryIndexing.java | 4 +-
.../processors/query/GridQueryProcessor.java | 18 +-
.../service/GridServiceProcessor.java | 2 +-
.../ignite/internal/util/GridJavaProcess.java | 2 +-
.../ignite/internal/util/IgniteUtils.java | 4 +-
.../shmem/IpcSharedMemoryClientEndpoint.java | 2 +-
.../ipc/shmem/IpcSharedMemoryNativeLoader.java | 151 ++++++-
.../shmem/IpcSharedMemoryServerEndpoint.java | 2 +-
.../util/nio/GridShmemCommunicationClient.java | 146 +++++++
.../communication/tcp/TcpCommunicationSpi.java | 415 ++++++++++++++++++-
.../tcp/TcpCommunicationSpiMBean.java | 8 +
.../cache/GridCacheAbstractFullApiSelfTest.java | 15 +
.../cache/IgniteDynamicCacheStartSelfTest.java | 19 +
.../distributed/IgniteCacheManyClientsTest.java | 169 ++++++++
.../IgniteCacheMessageRecoveryAbstractTest.java | 1 +
...achePartitionedPreloadLifecycleSelfTest.java | 2 +-
...CacheReplicatedPreloadLifecycleSelfTest.java | 6 +-
.../GridCacheSwapScanQueryAbstractSelfTest.java | 112 +++--
.../ipc/shmem/IgfsSharedMemoryTestServer.java | 2 +
.../IpcSharedMemoryCrashDetectionSelfTest.java | 2 +-
.../ipc/shmem/IpcSharedMemorySpaceSelfTest.java | 2 +-
.../ipc/shmem/IpcSharedMemoryUtilsSelfTest.java | 2 +-
.../LoadWithCorruptedLibFileTestRunner.java | 2 +-
.../IpcSharedMemoryBenchmarkReader.java | 2 +-
.../IpcSharedMemoryBenchmarkWriter.java | 2 +-
.../communication/GridIoManagerBenchmark0.java | 1 +
.../spi/GridTcpSpiForwardingSelfTest.java | 1 +
.../GridTcpCommunicationSpiAbstractTest.java | 13 +
...mmunicationSpiConcurrentConnectSelfTest.java | 4 +-
...cpCommunicationSpiMultithreadedSelfTest.java | 21 +-
...pCommunicationSpiMultithreadedShmemTest.java | 28 ++
...dTcpCommunicationSpiRecoveryAckSelfTest.java | 1 +
...GridTcpCommunicationSpiRecoverySelfTest.java | 1 +
.../GridTcpCommunicationSpiShmemSelfTest.java | 38 ++
.../tcp/GridTcpCommunicationSpiTcpSelfTest.java | 7 +
.../testsuites/IgniteCacheTestSuite4.java | 2 +
.../IgniteSpiCommunicationSelfTestSuite.java | 2 +
modules/hadoop/pom.xml | 1 +
.../HadoopIgfs20FileSystemAbstractSelfTest.java | 13 +
...oopSecondaryFileSystemConfigurationTest.java | 14 +
...IgniteHadoopFileSystemHandshakeSelfTest.java | 7 +
.../IgniteHadoopFileSystemIpcCacheSelfTest.java | 7 +
.../hadoop/HadoopAbstractSelfTest.java | 7 +
.../processors/query/h2/IgniteH2Indexing.java | 44 +-
.../h2/twostep/GridReduceQueryExecutor.java | 8 +-
...CacheScanPartitionQueryFallbackSelfTest.java | 408 ++++++++++++++++++
.../cache/GridCacheCrossCacheQuerySelfTest.java | 12 +-
.../cache/IgniteCacheAbstractQuerySelfTest.java | 77 +++-
.../IgniteCacheQuerySelfTestSuite.java | 2 +
modules/scalar-2.10/README.txt | 4 +
modules/scalar-2.10/licenses/apache-2.0.txt | 202 +++++++++
.../scalar-2.10/licenses/scala-bsd-license.txt | 18 +
modules/scalar-2.10/pom.xml | 197 +++++++++
modules/spark-2.10/README.txt | 4 +
modules/spark-2.10/licenses/apache-2.0.txt | 202 +++++++++
.../spark-2.10/licenses/scala-bsd-license.txt | 18 +
modules/spark-2.10/pom.xml | 120 ++++++
modules/spark/README.txt | 8 +
modules/spark/licenses/apache-2.0.txt | 202 +++++++++
modules/spark/licenses/scala-bsd-license.txt | 18 +
modules/spark/pom.xml | 114 +++++
.../org/apache/ignite/spark/IgniteContext.scala | 119 ++++++
.../org/apache/ignite/spark/IgniteRDD.scala | 244 +++++++++++
.../apache/ignite/spark/JavaIgniteContext.scala | 63 +++
.../org/apache/ignite/spark/JavaIgniteRDD.scala | 99 +++++
.../ignite/spark/impl/IgniteAbstractRDD.scala | 39 ++
.../ignite/spark/impl/IgnitePartition.scala | 24 ++
.../ignite/spark/impl/IgniteQueryIterator.scala | 27 ++
.../apache/ignite/spark/impl/IgniteSqlRDD.scala | 41 ++
.../spark/impl/JavaIgniteAbstractRDD.scala | 34 ++
.../ignite/spark/JavaIgniteRDDSelfTest.java | 298 +++++++++++++
.../scala/org/apache/ignite/spark/Entity.scala | 28 ++
.../org/apache/ignite/spark/IgniteRddSpec.scala | 231 +++++++++++
modules/visor-console-2.10/README.txt | 4 +
modules/visor-console-2.10/pom.xml | 174 ++++++++
parent/pom.xml | 4 +
pom.xml | 20 +-
99 files changed, 4773 insertions(+), 253 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/DEVNOTES.txt
----------------------------------------------------------------------
diff --git a/DEVNOTES.txt b/DEVNOTES.txt
index cd72418..d02e6ba 100644
--- a/DEVNOTES.txt
+++ b/DEVNOTES.txt
@@ -3,9 +3,15 @@ Ignite Fabric Maven Build Instructions
Without LGPL dependencies (default):
mvn clean package -DskipTests
+Without LGPL dependencies and Scala 2.10:
+ mvn clean package -DskipTests -Dscala-2.10
+
With LGPL dependencies:
mvn clean package -DskipTests -Prelease,lgpl
+With LGPL dependencies and Scala 2.10:
+ mvn clean package -DskipTests -Prelease,lgpl -Dscala-2.10
+
Look for incubator-ignite-<version>-bin.zip in ./target/bin directory.
NOTE: JDK version should be 1.7.0-* or >= 1.8.0-u40.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/assembly/dependencies-fabric.xml
----------------------------------------------------------------------
diff --git a/assembly/dependencies-fabric.xml b/assembly/dependencies-fabric.xml
index a294243..c6668f6 100644
--- a/assembly/dependencies-fabric.xml
+++ b/assembly/dependencies-fabric.xml
@@ -113,6 +113,7 @@
<exclude>org.apache.ignite:ignite-examples</exclude>
<exclude>org.apache.ignite:ignite-indexing</exclude>
<exclude>org.apache.ignite:ignite-visor-console</exclude>
+ <exclude>org.apache.ignite:ignite-visor-console_2.10</exclude>
<exclude>org.apache.ignite:ignite-visor-plugins</exclude>
<exclude>org.apache.ignite:ignite-visor-trial</exclude>
<exclude>org.apache.ignite:ignite-hadoop</exclude>
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/pom.xml b/examples/pom.xml
index 78c5852..a775987 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -173,6 +173,40 @@
</profile>
<profile>
+ <id>scala-2.10</id>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-scalar_2.10</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest_2.10</artifactId>
+ <version>2.2.2</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>net.alchim31.maven</groupId>
+ <artifactId>scala-maven-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+
+ <profile>
<id>java8-examples</id>
<activation>
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/pom.xml
----------------------------------------------------------------------
diff --git a/modules/core/pom.xml b/modules/core/pom.xml
index 0460b46..47ed9cb 100644
--- a/modules/core/pom.xml
+++ b/modules/core/pom.xml
@@ -129,7 +129,6 @@
<groupId>org.gridgain</groupId>
<artifactId>ignite-shmem</artifactId>
<version>1.0.0</version>
- <scope>test</scope>
</dependency>
<dependency>
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index 439ea2d..b166f39 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -337,6 +337,9 @@ public final class IgniteSystemProperties {
*/
public static final String IGNITE_SQL_MERGE_TABLE_MAX_SIZE = "IGNITE_SQL_MERGE_TABLE_MAX_SIZE";
+ /** Maximum size for affinity assignment history. */
+ public static final String IGNITE_AFFINITY_HISTORY_SIZE = "IGNITE_AFFINITY_HISTORY_SIZE";
+
/**
* Enforces singleton.
*/
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/main/java/org/apache/ignite/cache/query/ScanQuery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/ScanQuery.java b/modules/core/src/main/java/org/apache/ignite/cache/query/ScanQuery.java
index 688eb2e..e6b69bc 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/query/ScanQuery.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/query/ScanQuery.java
@@ -36,11 +36,23 @@ public final class ScanQuery<K, V> extends Query<Cache.Entry<K, V>> {
/** */
private IgniteBiPredicate<K, V> filter;
+ /** */
+ private Integer part;
+
/**
* Create scan query returning all entries.
*/
public ScanQuery() {
- this(null);
+ this(null, null);
+ }
+
+ /**
+ * Creates partition scan query returning all entries for given partition.
+ *
+ * @param part Partition.
+ */
+ public ScanQuery(int part) {
+ this(part, null);
}
/**
@@ -49,6 +61,17 @@ public final class ScanQuery<K, V> extends Query<Cache.Entry<K, V>> {
* @param filter Filter. If {@code null} then all entries will be returned.
*/
public ScanQuery(@Nullable IgniteBiPredicate<K, V> filter) {
+ this(null, filter);
+ }
+
+ /**
+ * Create scan query with filter.
+ *
+ * @param part Partition.
+ * @param filter Filter. If {@code null} then all entries will be returned.
+ */
+ public ScanQuery(@Nullable Integer part, @Nullable IgniteBiPredicate<K, V> filter) {
+ setPartition(part);
setFilter(filter);
}
@@ -73,6 +96,26 @@ public final class ScanQuery<K, V> extends Query<Cache.Entry<K, V>> {
return this;
}
+ /**
+ * Gets partition number over which this query should iterate. Will return {@code null} if partition was not
+ * set. In this case query will iterate over all partitions in the cache.
+ *
+ * @return Partition number or {@code null}.
+ */
+ @Nullable public Integer getPartition() {
+ return part;
+ }
+
+ /**
+ * Sets partition number over which this query should iterate. If {@code null}, query will iterate over
+ * all partitions in the cache. Must be in the range [0, N) where N is partition number in the cache.
+ *
+ * @param part Partition number over which this query should iterate.
+ */
+ public void setPartition(@Nullable Integer part) {
+ this.part = part;
+ }
+
/** {@inheritDoc} */
@Override public ScanQuery<K, V> setPageSize(int pageSize) {
return (ScanQuery<K, V>)super.setPageSize(pageSize);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
index 1aa4fd6..a16438c 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
@@ -774,7 +774,6 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
*
* @param loadPrevVal Load previous value flag.
* @return {@code this} for chaining.
- * @return {@code this} for chaining.
*/
public CacheConfiguration setLoadPreviousValue(boolean loadPrevVal) {
this.loadPrevVal = loadPrevVal;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
index 47f222e..6989385 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
@@ -408,9 +408,10 @@ public class GridAffinityAssignmentCache {
throw new IllegalStateException("Getting affinity for topology version earlier than affinity is " +
"calculated [locNodeId=" + ctx.localNodeId() +
", cache=" + cacheName +
- ", history=" + affCache.keySet() +
", topVer=" + topVer +
- ", head=" + head.get().topologyVersion() + ']');
+ ", head=" + head.get().topologyVersion() +
+ ", history=" + affCache.keySet() +
+ ']');
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index d8d029e..2ca7687 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -1337,8 +1337,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
final Collection<KeyCacheObject> loadedKeys = new GridConcurrentHashSet<>();
- IgniteInternalFuture<Object> readFut =
- readThroughAllAsync(absentKeys, true, skipVals, null, subjId, taskName, new CI2<KeyCacheObject, Object>() {
+ IgniteInternalFuture<Object> readFut = readThroughAllAsync(absentKeys, true, skipVals, null,
+ subjId, taskName, new CI2<KeyCacheObject, Object>() {
/** Version for all loaded entries. */
private GridCacheVersion nextVer = ctx.versions().next();
@@ -1968,7 +1968,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
* @param filter Optional filter.
* @return Put operation future.
*/
- public IgniteInternalFuture<V> getAndPutAsync0(final K key, final V val, @Nullable final CacheEntryPredicate... filter) {
+ public IgniteInternalFuture<V> getAndPutAsync0(final K key, final V val,
+ @Nullable final CacheEntryPredicate... filter) {
A.notNull(key, "key", val, "val");
if (keyCheck)
@@ -3137,7 +3138,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
}
/** {@inheritDoc} */
- @Override public boolean lockAll(@Nullable Collection<? extends K> keys, long timeout) throws IgniteCheckedException {
+ @Override public boolean lockAll(@Nullable Collection<? extends K> keys, long timeout)
+ throws IgniteCheckedException {
if (F.isEmpty(keys))
return true;
@@ -3711,7 +3713,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
if (!ctx0.isSwapOrOffheapEnabled() && ctx0.kernalContext().discovery().size() == 1)
return localIteratorHonorExpirePolicy(opCtx);
- CacheQueryFuture<Map.Entry<K, V>> fut = ctx0.queries().createScanQuery(null, ctx.keepPortable())
+ CacheQueryFuture<Map.Entry<K, V>> fut = ctx0.queries().createScanQuery(null, null, ctx.keepPortable())
.keepAll(false)
.execute();
@@ -3944,7 +3946,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
return t;
}
- catch (IgniteInterruptedCheckedException | IgniteTxHeuristicCheckedException | IgniteTxRollbackCheckedException e) {
+ catch (IgniteInterruptedCheckedException | IgniteTxHeuristicCheckedException |
+ IgniteTxRollbackCheckedException e) {
throw e;
}
catch (IgniteCheckedException e) {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 3236bb5..3df45cb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -59,7 +59,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
private static final int EXCHANGE_HISTORY_SIZE = 1000;
/** Cleanup history size. */
- public static final int EXCH_FUT_CLEANUP_HISTORY_SIZE = 10;
+ public static final int EXCH_FUT_CLEANUP_HISTORY_SIZE = getInteger(IGNITE_AFFINITY_HISTORY_SIZE, 100);
/** Atomic reference for pending timeout object. */
private AtomicReference<ResendTimeoutObject> pendingResend = new AtomicReference<>();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 871cd77..5582ba7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -1962,7 +1962,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
req.cacheType(cacheType);
- return F.first(initiateCacheChanges(F.asList(req)));
+ return F.first(initiateCacheChanges(F.asList(req), failIfExists));
}
/**
@@ -1972,14 +1972,16 @@ public class GridCacheProcessor extends GridProcessorAdapter {
public IgniteInternalFuture<?> dynamicStopCache(String cacheName) {
DynamicCacheChangeRequest t = new DynamicCacheChangeRequest(cacheName, ctx.localNodeId(), true);
- return F.first(initiateCacheChanges(F.asList(t)));
+ return F.first(initiateCacheChanges(F.asList(t), false));
}
/**
* @param reqs Requests.
* @return Collection of futures.
*/
- public Collection<DynamicCacheStartFuture> initiateCacheChanges(Collection<DynamicCacheChangeRequest> reqs) {
+ @SuppressWarnings("TypeMayBeWeakened")
+ private Collection<DynamicCacheStartFuture> initiateCacheChanges(Collection<DynamicCacheChangeRequest> reqs,
+ boolean failIfExists) {
Collection<DynamicCacheStartFuture> res = new ArrayList<>(reqs.size());
Collection<DynamicCacheChangeRequest> sndReqs = new ArrayList<>(reqs.size());
@@ -2012,9 +2014,23 @@ public class GridCacheProcessor extends GridProcessorAdapter {
maskNull(req.cacheName()), fut);
if (old != null) {
- if (req.start() && !req.clientStartOnly()) {
- fut.onDone(new CacheExistsException("Failed to start cache " +
- "(a cache with the same name is already being started or stopped): " + req.cacheName()));
+ if (req.start()) {
+ if (!req.clientStartOnly()) {
+ if (failIfExists)
+ fut.onDone(new CacheExistsException("Failed to start cache " +
+ "(a cache with the same name is already being started or stopped): " +
+ req.cacheName()));
+ else {
+ fut = old;
+
+ continue;
+ }
+ }
+ else {
+ fut = old;
+
+ continue;
+ }
}
else {
fut = old;
@@ -2664,7 +2680,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
req.clientStartOnly(true);
- F.first(initiateCacheChanges(F.asList(req))).get();
+ F.first(initiateCacheChanges(F.asList(req), false)).get();
IgniteCacheProxy cache = jCacheProxies.get(masked);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
index 772e849..d0d9049 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
@@ -1251,7 +1251,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
checkIteratorQueue();
if (offHeapEnabled() && !swapEnabled())
- return rawOffHeapIterator(true, true);
+ return rawOffHeapIterator(null, true, true);
if (swapEnabled() && !offHeapEnabled())
return rawSwapIterator(true, true);
@@ -1267,7 +1267,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
private Map.Entry<byte[], byte[]> cur;
{
- it = rawOffHeapIterator(true, true);
+ it = rawOffHeapIterator(null, true, true);
advance();
}
@@ -1598,11 +1598,13 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
/**
* @param c Key/value closure.
+ * @param part Partition.
* @param primary Include primaries.
* @param backup Include backups.
* @return Off-heap iterator.
*/
public <T> GridCloseableIterator<T> rawOffHeapIterator(final CX2<T2<Long, Integer>, T2<Long, Integer>, T> c,
+ Integer part,
boolean primary,
boolean backup)
{
@@ -1618,24 +1620,31 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
AffinityTopologyVersion ver = cctx.affinity().affinityTopologyVersion();
- Set<Integer> parts = primary ? cctx.affinity().primaryPartitions(cctx.localNodeId(), ver) :
- cctx.affinity().backupPartitions(cctx.localNodeId(), ver);
+ Set<Integer> parts;
+
+ if (part == null)
+ parts = primary ? cctx.affinity().primaryPartitions(cctx.localNodeId(), ver) :
+ cctx.affinity().backupPartitions(cctx.localNodeId(), ver);
+ else
+ parts = Collections.singleton(part);
return new CloseablePartitionsIterator<T, T>(parts) {
@Override protected GridCloseableIterator<T> partitionIterator(int part)
- throws IgniteCheckedException
- {
+ throws IgniteCheckedException {
return offheap.iterator(spaceName, c, part);
}
};
}
/**
+ *
+ * @param part Partition.
* @param primary Include primaries.
* @param backup Include backups.
* @return Raw off-heap iterator.
*/
- public GridCloseableIterator<Map.Entry<byte[], byte[]>> rawOffHeapIterator(final boolean primary,
+ public GridCloseableIterator<Map.Entry<byte[], byte[]>> rawOffHeapIterator(@Nullable Integer part,
+ final boolean primary,
final boolean backup)
{
if (!offheapEnabled || (!primary && !backup))
@@ -1673,8 +1682,13 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
AffinityTopologyVersion ver = cctx.affinity().affinityTopologyVersion();
- Set<Integer> parts = primary ? cctx.affinity().primaryPartitions(cctx.localNodeId(), ver) :
- cctx.affinity().backupPartitions(cctx.localNodeId(), ver);
+ Set<Integer> parts;
+
+ if (part == null)
+ parts = primary ? cctx.affinity().primaryPartitions(cctx.localNodeId(), ver) :
+ cctx.affinity().backupPartitions(cctx.localNodeId(), ver);
+ else
+ parts = Collections.singleton(part);
return new CloseablePartitionsIterator<Map.Entry<byte[], byte[]>, IgniteBiTuple<byte[], byte[]>>(parts) {
private Map.Entry<byte[], byte[]> cur;
@@ -1751,6 +1765,29 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
}
/**
+ * @param part Partition.
+ * @return Raw off-heap iterator.
+ * @throws IgniteCheckedException If failed.
+ */
+ public GridCloseableIterator<Map.Entry<byte[], byte[]>> rawSwapIterator(int part)
+ throws IgniteCheckedException
+ {
+ if (!swapEnabled)
+ return new GridEmptyCloseableIterator<>();
+
+ checkIteratorQueue();
+
+ return new CloseablePartitionsIterator<Map.Entry<byte[], byte[]>, Map.Entry<byte[], byte[]>>(
+ Collections.singleton(part)) {
+ @Override protected GridCloseableIterator<Map.Entry<byte[], byte[]>> partitionIterator(int part)
+ throws IgniteCheckedException
+ {
+ return swapMgr.rawIterator(spaceName, part);
+ }
+ };
+ }
+
+ /**
* @param primary If {@code true} includes primary entries.
* @param backup If {@code true} includes backup entries.
* @param topVer Topology version.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index 4390993..69ce7b0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -353,7 +353,8 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
if (filter instanceof ScanQuery) {
IgniteBiPredicate<K, V> p = ((ScanQuery)filter).getFilter();
- qry = ctx.queries().createScanQuery(p != null ? p : ACCEPT_ALL, isKeepPortable);
+ qry = ctx.queries().createScanQuery(p != null ? p : ACCEPT_ALL, ((ScanQuery)filter).getPartition(),
+ isKeepPortable);
if (grp != null)
qry.projection(grp);
@@ -496,10 +497,14 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
return (QueryCursor<R>)queryContinuous((ContinuousQuery<K, V>)qry, qry.isLocal());
if (qry instanceof SqlQuery) {
- SqlQuery p = (SqlQuery)qry;
+ final SqlQuery p = (SqlQuery)qry;
if (isReplicatedDataNode() || ctx.isLocal() || qry.isLocal())
- return (QueryCursor<R>)new QueryCursorImpl<>(ctx.kernalContext().query().<K, V>queryLocal(ctx, p));
+ return (QueryCursor<R>)new QueryCursorImpl<>(new Iterable<Cache.Entry<K, V>>() {
+ @Override public Iterator<Cache.Entry<K, V>> iterator() {
+ return ctx.kernalContext().query().<K, V>queryLocal(ctx, p);
+ }
+ });
return (QueryCursor<R>)ctx.kernalContext().query().queryTwoStep(ctx, p);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/QueryCursorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/QueryCursorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/QueryCursorImpl.java
index 7cb9efc..d68c377 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/QueryCursorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/QueryCursorImpl.java
@@ -27,6 +27,9 @@ import java.util.*;
* Query cursor implementation.
*/
public class QueryCursorImpl<T> implements QueryCursorEx<T> {
+ /** Query executor. */
+ private Iterable<T> iterExec;
+
/** */
private Iterator<T> iter;
@@ -34,18 +37,18 @@ public class QueryCursorImpl<T> implements QueryCursorEx<T> {
private boolean iterTaken;
/** */
- private Collection<GridQueryFieldMetadata> fieldsMeta;
+ private List<GridQueryFieldMetadata> fieldsMeta;
/**
- * @param iter Iterator.
+ * @param iterExec Query executor.
*/
- public QueryCursorImpl(Iterator<T> iter) {
- this.iter = iter;
+ public QueryCursorImpl(Iterable<T> iterExec) {
+ this.iterExec = iterExec;
}
/** {@inheritDoc} */
@Override public Iterator<T> iterator() {
- if (iter == null)
+ if (iter == null && iterTaken)
throw new IgniteException("Cursor is closed.");
if (iterTaken)
@@ -53,12 +56,16 @@ public class QueryCursorImpl<T> implements QueryCursorEx<T> {
iterTaken = true;
+ iter = iterExec.iterator();
+
+ assert iter != null;
+
return iter;
}
/** {@inheritDoc} */
@Override public List<T> getAll() {
- ArrayList<T> all = new ArrayList<>();
+ List<T> all = new ArrayList<>();
try {
for (T t : this) // Implicitly calls iterator() to do all checks.
@@ -103,14 +110,14 @@ public class QueryCursorImpl<T> implements QueryCursorEx<T> {
/**
* @param fieldsMeta SQL Fields query result metadata.
*/
- public void fieldsMeta(Collection<GridQueryFieldMetadata> fieldsMeta) {
+ public void fieldsMeta(List<GridQueryFieldMetadata> fieldsMeta) {
this.fieldsMeta = fieldsMeta;
}
/**
* @return SQL Fields query result metadata.
*/
- public Collection<GridQueryFieldMetadata> fieldsMeta() {
+ @Override public List<GridQueryFieldMetadata> fieldsMeta() {
return fieldsMeta;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
index 0749f66..8ac3809 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
@@ -149,6 +149,13 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>
}
/**
+ * @return Keys belonging to partition.
+ */
+ public Set<KeyCacheObject> keySet() {
+ return map.keySet();
+ }
+
+ /**
* @return Entries belonging to partition.
*/
public Collection<GridDhtCacheEntry> entries() {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java
index 0658828..2d2db1b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java
@@ -76,7 +76,7 @@ import org.jetbrains.annotations.*;
* </li>
* <li>
* Joins will work correctly only if joined objects are stored in
- * collocated mode or at least one side of the join is stored in
+ * colocated mode or at least one side of the join is stored in
* {@link org.apache.ignite.cache.CacheMode#REPLICATED} cache. Refer to
* {@link AffinityKey} javadoc for more information about colocation.
* </li>
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
index a579aab..2b93144 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
@@ -229,6 +229,7 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage
false,
null,
req.keyValueFilter(),
+ req.partition(),
req.className(),
req.clause(),
req.includeMetaData(),
@@ -518,6 +519,7 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage
qry.query().clause(),
clsName,
qry.query().scanFilter(),
+ qry.query().partition(),
qry.reducer(),
qry.transform(),
qry.query().pageSize(),
@@ -626,6 +628,7 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage
qry.query().clause(),
null,
null,
+ null,
qry.reducer(),
qry.transform(),
qry.query().pageSize(),
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
index 7f0a5ec..5b82c34 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
@@ -21,13 +21,17 @@ import org.apache.ignite.*;
import org.apache.ignite.cache.*;
import org.apache.ignite.cache.query.*;
import org.apache.ignite.cluster.*;
+import org.apache.ignite.internal.*;
import org.apache.ignite.internal.cluster.*;
+import org.apache.ignite.internal.processors.affinity.*;
import org.apache.ignite.internal.processors.cache.*;
import org.apache.ignite.internal.processors.query.*;
+import org.apache.ignite.internal.util.future.*;
import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.lang.*;
import org.apache.ignite.plugin.security.*;
+
import org.jetbrains.annotations.*;
import java.util.*;
@@ -38,6 +42,13 @@ import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryTy
* Query adapter.
*/
public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
+ /** Is local node predicate. */
+ private static final IgnitePredicate<ClusterNode> IS_LOC_NODE = new IgnitePredicate<ClusterNode>() {
+ @Override public boolean apply(ClusterNode n) {
+ return n.isLocal();
+ }
+ };
+
/** */
private final GridCacheContext<?, ?> cctx;
@@ -56,6 +67,9 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
/** */
private final IgniteBiPredicate<Object, Object> filter;
+ /** Partition. */
+ private Integer part;
+
/** */
private final boolean incMeta;
@@ -95,6 +109,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
* @param clsName Class name.
* @param clause Clause.
* @param filter Scan filter.
+ * @param part Partition.
* @param incMeta Include metadata flag.
* @param keepPortable Keep portable flag.
*/
@@ -103,16 +118,19 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
@Nullable String clsName,
@Nullable String clause,
@Nullable IgniteBiPredicate<Object, Object> filter,
+ @Nullable Integer part,
boolean incMeta,
boolean keepPortable) {
assert cctx != null;
assert type != null;
+ assert part == null || part >= 0;
this.cctx = cctx;
this.type = type;
this.clsName = clsName;
this.clause = clause;
this.filter = filter;
+ this.part = part;
this.incMeta = incMeta;
this.keepPortable = keepPortable;
@@ -132,6 +150,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
* @param dedup Enable dedup flag.
* @param prj Grid projection.
* @param filter Key-value filter.
+ * @param part Partition.
* @param clsName Class name.
* @param clause Clause.
* @param incMeta Include metadata flag.
@@ -149,6 +168,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
boolean dedup,
ClusterGroup prj,
IgniteBiPredicate<Object, Object> filter,
+ @Nullable Integer part,
@Nullable String clsName,
String clause,
boolean incMeta,
@@ -165,6 +185,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
this.dedup = dedup;
this.prj = prj;
this.filter = filter;
+ this.part = part;
this.clsName = clsName;
this.clause = clause;
this.incMeta = incMeta;
@@ -334,6 +355,13 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
}
/**
+ * @return Partition.
+ */
+ @Nullable public Integer partition() {
+ return part;
+ }
+
+ /**
* @throws IgniteCheckedException If query is invalid.
*/
public void validate() throws IgniteCheckedException {
@@ -410,16 +438,18 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
taskHash = cctx.kernalContext().job().currentTaskNameHash();
- GridCacheQueryBean bean = new GridCacheQueryBean(this, (IgniteReducer<Object, Object>)rmtReducer,
+ final GridCacheQueryBean bean = new GridCacheQueryBean(this, (IgniteReducer<Object, Object>)rmtReducer,
(IgniteClosure<Object, Object>)rmtTransform, args);
- GridCacheQueryManager qryMgr = cctx.queries();
+ final GridCacheQueryManager qryMgr = cctx.queries();
boolean loc = nodes.size() == 1 && F.first(nodes).id().equals(cctx.localNodeId());
if (type == SQL_FIELDS || type == SPI)
return (CacheQueryFuture<R>)(loc ? qryMgr.queryFieldsLocal(bean) :
qryMgr.queryFieldsDistributed(bean, nodes));
+ else if (type == SCAN && part != null && nodes.size() > 1)
+ return new CacheQueryFallbackFuture<>(nodes, bean, qryMgr);
else
return (CacheQueryFuture<R>)(loc ? qryMgr.queryLocal(bean) : qryMgr.queryDistributed(bean, nodes));
}
@@ -439,15 +469,15 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
return Collections.singletonList(cctx.localNode());
case REPLICATED:
- if (prj != null)
- return nodes(cctx, prj);
+ if (prj != null || partition() != null)
+ return nodes(cctx, prj, partition());
return cctx.affinityNode() ?
Collections.singletonList(cctx.localNode()) :
- Collections.singletonList(F.rand(nodes(cctx, null)));
+ Collections.singletonList(F.rand(nodes(cctx, null, partition())));
case PARTITIONED:
- return nodes(cctx, prj);
+ return nodes(cctx, prj, partition());
default:
throw new IllegalStateException("Unknown cache distribution mode: " + cacheMode);
@@ -459,17 +489,26 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
* @param prj Projection (optional).
* @return Collection of data nodes in provided projection (if any).
*/
- private static Collection<ClusterNode> nodes(final GridCacheContext<?, ?> cctx, @Nullable final ClusterGroup prj) {
+ private static Collection<ClusterNode> nodes(final GridCacheContext<?, ?> cctx,
+ @Nullable final ClusterGroup prj, @Nullable final Integer part) {
assert cctx != null;
+ final AffinityTopologyVersion topVer = cctx.affinity().affinityTopologyVersion();
+
Collection<ClusterNode> affNodes = CU.affinityNodes(cctx);
- if (prj == null)
+ if (prj == null && part == null)
return affNodes;
+ final Set<ClusterNode> owners =
+ part == null ? Collections.<ClusterNode>emptySet() : new HashSet<>(cctx.topology().owners(part, topVer));
+
return F.view(affNodes, new P1<ClusterNode>() {
@Override public boolean apply(ClusterNode n) {
- return prj.node(n.id()) != null;
+
+ return cctx.discovery().cacheAffinityNode(n, cctx.name()) &&
+ (prj == null || prj.node(n.id()) != null) &&
+ (part == null || owners.contains(n));
}
});
}
@@ -478,4 +517,94 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
@Override public String toString() {
return S.toString(GridCacheQueryAdapter.class, this);
}
+
+ /**
+ * Wrapper for queries with fallback.
+ */
+ private static class CacheQueryFallbackFuture<R> extends GridFutureAdapter<Collection<R>>
+ implements CacheQueryFuture<R> {
+ /** Query future. */
+ private volatile GridCacheQueryFutureAdapter<?, ?, R> fut;
+
+ /** Backups. */
+ private final Queue<ClusterNode> nodes;
+
+ /** Bean. */
+ private final GridCacheQueryBean bean;
+
+ /** Query manager. */
+ private final GridCacheQueryManager qryMgr;
+
+ /**
+ * @param nodes Backups.
+ * @param bean Bean.
+ * @param qryMgr Query manager.
+ */
+ public CacheQueryFallbackFuture(Collection<ClusterNode> nodes, GridCacheQueryBean bean,
+ GridCacheQueryManager qryMgr) {
+ this.nodes = fallbacks(nodes);
+ this.bean = bean;
+ this.qryMgr = qryMgr;
+
+ init();
+ }
+
+ /**
+ * @param nodes Nodes.
+ */
+ private Queue<ClusterNode> fallbacks(Collection<ClusterNode> nodes) {
+ Queue<ClusterNode> fallbacks = new LinkedList<>();
+
+ ClusterNode node = F.first(F.view(nodes, IS_LOC_NODE));
+
+ if (node != null)
+ fallbacks.add(node);
+
+ fallbacks.addAll(node != null ? F.view(nodes, F.not(IS_LOC_NODE)) : nodes);
+
+ return fallbacks;
+ }
+
+ /**
+ *
+ */
+ private void init() {
+ ClusterNode node = nodes.poll();
+
+ GridCacheQueryFutureAdapter<?, ?, R> fut0 =
+ (GridCacheQueryFutureAdapter<?, ?, R>)(node.isLocal() ? qryMgr.queryLocal(bean) :
+ qryMgr.queryDistributed(bean, Collections.singleton(node)));
+
+ fut0.listen(new IgniteInClosure<IgniteInternalFuture<Collection<R>>>() {
+ @Override public void apply(IgniteInternalFuture<Collection<R>> fut) {
+ try {
+ onDone(fut.get());
+ }
+ catch (IgniteCheckedException e) {
+ if (F.isEmpty(nodes))
+ onDone(e);
+ else
+ init();
+ }
+ }
+ });
+
+ fut = fut0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int available() {
+ return fut.available();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean cancel() throws IgniteCheckedException {
+ return fut.cancel();
+ }
+
+ /** {@inheritDoc} */
+ @Override public R next() {
+ return fut.next();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index 32e9d63..6e71ba7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -52,6 +52,8 @@ import java.util.concurrent.*;
import static org.apache.ignite.cache.CacheMode.*;
import static org.apache.ignite.events.EventType.*;
import static org.apache.ignite.internal.GridClosureCallMode.*;
+import static org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.*;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.*;
import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.*;
/**
@@ -111,8 +113,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
final Object recipient = recipient(nodeId, entry.getKey());
entry.getValue().listen(new CIX1<IgniteInternalFuture<QueryResult<K, V>>>() {
- @Override
- public void applyx(IgniteInternalFuture<QueryResult<K, V>> f)
+ @Override public void applyx(IgniteInternalFuture<QueryResult<K, V>> f)
throws IgniteCheckedException {
f.get().closeIfNotShared(recipient);
}
@@ -768,98 +769,138 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
final boolean backups = qry.includeBackups() || cctx.isReplicated();
- final GridCloseableIteratorAdapter<IgniteBiTuple<K, V>> heapIt = new GridCloseableIteratorAdapter<IgniteBiTuple<K, V>>() {
- private IgniteBiTuple<K, V> next;
+ final GridCloseableIteratorAdapter<IgniteBiTuple<K, V>> heapIt =
+ new GridCloseableIteratorAdapter<IgniteBiTuple<K, V>>() {
+ private IgniteBiTuple<K, V> next;
- private IgniteCacheExpiryPolicy expiryPlc = cctx.cache().expiryPolicy(plc);
+ private IgniteCacheExpiryPolicy expiryPlc = cctx.cache().expiryPolicy(plc);
- private Iterator<K> iter = backups ? prj.keySetx().iterator() : prj.primaryKeySet().iterator();
+ private Iterator<K> iter;
- {
- advance();
- }
+ private GridDhtLocalPartition locPart;
- @Override public boolean onHasNext() {
- return next != null;
- }
+ {
+ Integer part = qry.partition();
- @Override public IgniteBiTuple<K, V> onNext() {
- if (next == null)
- throw new NoSuchElementException();
+ if (part == null || dht == null)
+ iter = backups ? prj.keySetx().iterator() : prj.primaryKeySet().iterator();
+ else if (part < 0 || part >= cctx.affinity().partitions())
+ iter = F.emptyIterator();
+ else {
+ AffinityTopologyVersion topVer = cctx.affinity().affinityTopologyVersion();
- IgniteBiTuple<K, V> next0 = next;
+ locPart = dht.topology().localPartition(part, topVer, false);
- advance();
+ if (locPart == null || (locPart.state() != OWNING && locPart.state() != RENTING) ||
+ !locPart.reserve())
+ throw new GridDhtInvalidPartitionException(part, "Partition can't be reserved");
- return next0;
- }
+ iter = new Iterator<K>() {
+ private Iterator<KeyCacheObject> iter0 = locPart.keySet().iterator();
- private void advance() {
- IgniteBiTuple<K, V> next0 = null;
-
- while (iter.hasNext()) {
- next0 = null;
+ @Override public boolean hasNext() {
+ return iter0.hasNext();
+ }
- K key = iter.next();
+ @Override public K next() {
+ KeyCacheObject key = iter0.next();
- V val;
+ return key.value(cctx.cacheObjectContext(), false);
+ }
- try {
- val = prj.localPeek(key, CachePeekModes.ONHEAP_ONLY, expiryPlc);
+ @Override public void remove() {
+ iter0.remove();
+ }
+ };
}
- catch (IgniteCheckedException e) {
- if (log.isDebugEnabled())
- log.debug("Failed to peek value: " + e);
- val = null;
- }
+ advance();
+ }
- if (dht != null && expiryPlc != null && expiryPlc.readyToFlush(100)) {
- dht.sendTtlUpdateRequest(expiryPlc);
+ @Override public boolean onHasNext() {
+ return next != null;
+ }
- expiryPlc = cctx.cache().expiryPolicy(plc);
- }
+ @Override public IgniteBiTuple<K, V> onNext() {
+ if (next == null)
+ throw new NoSuchElementException();
- if (val != null) {
- next0 = F.t(key, val);
+ IgniteBiTuple<K, V> next0 = next;
- if (checkPredicate(next0))
- break;
- else
- next0 = null;
- }
+ advance();
+
+ return next0;
}
- next = next0 != null ?
- new IgniteBiTuple<>(next0.getKey(), next0.getValue()) :
- null;
+ private void advance() {
+ IgniteBiTuple<K, V> next0 = null;
- if (next == null)
- sendTtlUpdate();
- }
+ while (iter.hasNext()) {
+ next0 = null;
- @Override protected void onClose() {
- sendTtlUpdate();
- }
+ K key = iter.next();
+
+ V val;
- private void sendTtlUpdate() {
- if (dht != null && expiryPlc != null) {
- dht.sendTtlUpdateRequest(expiryPlc);
+ try {
+ val = prj.localPeek(key, CachePeekModes.ONHEAP_ONLY, expiryPlc);
+ }
+ catch (IgniteCheckedException e) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to peek value: " + e);
+
+ val = null;
+ }
- expiryPlc = null;
+ if (dht != null && expiryPlc != null && expiryPlc.readyToFlush(100)) {
+ dht.sendTtlUpdateRequest(expiryPlc);
+
+ expiryPlc = cctx.cache().expiryPolicy(plc);
+ }
+
+ if (val != null) {
+ next0 = F.t(key, val);
+
+ if (checkPredicate(next0))
+ break;
+ else
+ next0 = null;
+ }
+ }
+
+ next = next0 != null ?
+ new IgniteBiTuple<>(next0.getKey(), next0.getValue()) :
+ null;
+
+ if (next == null)
+ sendTtlUpdate();
}
- }
- private boolean checkPredicate(Map.Entry<K, V> e) {
- if (keyValFilter != null) {
- Map.Entry<K, V> e0 = (Map.Entry<K, V>)cctx.unwrapPortableIfNeeded(e, qry.keepPortable());
+ @Override protected void onClose() {
+ sendTtlUpdate();
- return keyValFilter.apply(e0.getKey(), e0.getValue());
+ if (locPart != null)
+ locPart.release();
}
- return true;
- }
- };
+ private void sendTtlUpdate() {
+ if (dht != null && expiryPlc != null) {
+ dht.sendTtlUpdateRequest(expiryPlc);
+
+ expiryPlc = null;
+ }
+ }
+
+ private boolean checkPredicate(Map.Entry<K, V> e) {
+ if (keyValFilter != null) {
+ Map.Entry<K, V> e0 = (Map.Entry<K, V>)cctx.unwrapPortableIfNeeded(e, qry.keepPortable());
+
+ return keyValFilter.apply(e0.getKey(), e0.getValue());
+ }
+
+ return true;
+ }
+ };
final GridIterator<IgniteBiTuple<K, V>> it;
@@ -914,7 +955,10 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
throws IgniteCheckedException {
IgniteBiPredicate<K, V> filter = qry.scanFilter();
- Iterator<Map.Entry<byte[], byte[]>> it = cctx.swap().rawSwapIterator(true, backups);
+ Integer part = qry.partition();
+
+ Iterator<Map.Entry<byte[], byte[]>> it = part == null ? cctx.swap().rawSwapIterator(true, backups) :
+ cctx.swap().rawSwapIterator(part);
return scanIterator(it, filter, qry.keepPortable());
}
@@ -930,10 +974,10 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
if (cctx.offheapTiered() && filter != null) {
OffheapIteratorClosure c = new OffheapIteratorClosure(filter, qry.keepPortable());
- return cctx.swap().rawOffHeapIterator(c, true, backups);
+ return cctx.swap().rawOffHeapIterator(c, qry.partition(), true, backups);
}
else {
- Iterator<Map.Entry<byte[], byte[]>> it = cctx.swap().rawOffHeapIterator(true, backups);
+ Iterator<Map.Entry<byte[], byte[]>> it = cctx.swap().rawOffHeapIterator(qry.partition(), true, backups);
return scanIterator(it, filter, qry.keepPortable());
}
@@ -1222,7 +1266,9 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
try {
// Preparing query closures.
- IgniteClosure<Map.Entry<K, V>, Object> trans = (IgniteClosure<Map.Entry<K, V>, Object>)qryInfo.transformer();
+ IgniteClosure<Map.Entry<K, V>, Object> trans =
+ (IgniteClosure<Map.Entry<K, V>, Object>)qryInfo.transformer();
+
IgniteReducer<Map.Entry<K, V>, Object> rdc = (IgniteReducer<Map.Entry<K, V>, Object>)qryInfo.reducer();
injectResources(trans);
@@ -1282,9 +1328,11 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
K key = row.getKey();
- // Filter backups for SCAN queries. Other types are filtered in indexing manager.
- if (!cctx.isReplicated() && cctx.config().getCacheMode() != LOCAL && qry.type() == SCAN &&
- !incBackups && !cctx.affinity().primary(cctx.localNode(), key, topVer)) {
+ // Filter backups for SCAN queries, if it isn't partition scan.
+ // Other types are filtered in indexing manager.
+ if (!cctx.isReplicated() && qry.type() == SCAN && qry.partition() == null &&
+ cctx.config().getCacheMode() != LOCAL && !incBackups &&
+ !cctx.affinity().primary(cctx.localNode(), key, topVer)) {
if (log.isDebugEnabled())
log.debug("Ignoring backup element [row=" + row +
", cacheMode=" + cctx.config().getCacheMode() + ", incBackups=" + incBackups +
@@ -1529,11 +1577,6 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
fut.onDone(executeQuery(qryInfo.query(), qryInfo.arguments(), false,
qryInfo.query().subjectId(), taskName, recipient(qryInfo.senderId(), qryInfo.requestId())));
}
- catch (Error e) {
- fut.onDone(e);
-
- throw e;
- }
catch (Throwable e) {
fut.onDone(e);
@@ -1843,7 +1886,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
return new IgniteBiPredicate<K, V>() {
@Override public boolean apply(K k, V v) {
- return cache.context().affinity().primary(ctx.discovery().localNode(), k, AffinityTopologyVersion.NONE);
+ return cache.context().affinity().primary(ctx.discovery().localNode(), k, NONE);
}
};
}
@@ -2920,6 +2963,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
null,
null,
null,
+ null,
false,
keepPortable);
}
@@ -2928,17 +2972,19 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
* Creates user's predicate based scan query.
*
* @param filter Scan filter.
+ * @param part Partition.
* @param keepPortable Keep portable flag.
* @return Created query.
*/
- @SuppressWarnings("unchecked")
public CacheQuery<Map.Entry<K, V>> createScanQuery(@Nullable IgniteBiPredicate<K, V> filter,
- boolean keepPortable) {
+ @Nullable Integer part, boolean keepPortable) {
+
return new GridCacheQueryAdapter<>(cctx,
SCAN,
null,
null,
(IgniteBiPredicate<Object, Object>)filter,
+ part,
false,
keepPortable);
}
@@ -2962,6 +3008,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
clsName,
search,
null,
+ null,
false,
keepPortable);
}
@@ -2982,6 +3029,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
null,
qry,
null,
+ null,
false,
keepPortable);
}
@@ -3002,6 +3050,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
null,
qry,
null,
+ null,
incMeta,
keepPortable);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
index 845077f..2113e7a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
@@ -26,6 +26,8 @@ import org.apache.ignite.lang.*;
import org.apache.ignite.marshaller.*;
import org.apache.ignite.plugin.extensions.communication.*;
+import org.jetbrains.annotations.*;
+
import java.io.*;
import java.nio.*;
import java.util.*;
@@ -109,6 +111,9 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache
/** */
private int taskHash;
+ /** Partition. */
+ private int part;
+
/**
* Required by {@link Externalizable}
*/
@@ -173,6 +178,7 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache
* @param clause Query clause.
* @param clsName Query class name.
* @param keyValFilter Key-value filter.
+ * @param part Partition.
* @param rdc Reducer.
* @param trans Transformer.
* @param pageSize Page size.
@@ -189,6 +195,7 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache
String clause,
String clsName,
IgniteBiPredicate<Object, Object> keyValFilter,
+ @Nullable Integer part,
IgniteReducer<Object, Object> rdc,
IgniteClosure<Object, Object> trans,
int pageSize,
@@ -211,6 +218,7 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache
this.clause = clause;
this.clsName = clsName;
this.keyValFilter = keyValFilter;
+ this.part = part == null ? -1 : part;
this.rdc = rdc;
this.trans = trans;
this.pageSize = pageSize;
@@ -414,6 +422,13 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache
return taskHash;
}
+ /**
+ * @return partition.
+ */
+ @Nullable public Integer partition() {
+ return part == -1 ? null : part;
+ }
+
/** {@inheritDoc} */
@Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
writer.setBuffer(buf);
@@ -508,30 +523,36 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache
writer.incrementState();
case 16:
- if (!writer.writeByteArray("rdcBytes", rdcBytes))
+ if (!writer.writeInt("part", part))
return false;
writer.incrementState();
case 17:
- if (!writer.writeUuid("subjId", subjId))
+ if (!writer.writeByteArray("rdcBytes", rdcBytes))
return false;
writer.incrementState();
case 18:
- if (!writer.writeInt("taskHash", taskHash))
+ if (!writer.writeUuid("subjId", subjId))
return false;
writer.incrementState();
case 19:
- if (!writer.writeByteArray("transBytes", transBytes))
+ if (!writer.writeInt("taskHash", taskHash))
return false;
writer.incrementState();
case 20:
+ if (!writer.writeByteArray("transBytes", transBytes))
+ return false;
+
+ writer.incrementState();
+
+ case 21:
if (!writer.writeByte("type", type != null ? (byte)type.ordinal() : -1))
return false;
@@ -658,7 +679,7 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache
reader.incrementState();
case 16:
- rdcBytes = reader.readByteArray("rdcBytes");
+ part = reader.readInt("part");
if (!reader.isLastRead())
return false;
@@ -666,7 +687,7 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache
reader.incrementState();
case 17:
- subjId = reader.readUuid("subjId");
+ rdcBytes = reader.readByteArray("rdcBytes");
if (!reader.isLastRead())
return false;
@@ -674,7 +695,7 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache
reader.incrementState();
case 18:
- taskHash = reader.readInt("taskHash");
+ subjId = reader.readUuid("subjId");
if (!reader.isLastRead())
return false;
@@ -682,7 +703,7 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache
reader.incrementState();
case 19:
- transBytes = reader.readByteArray("transBytes");
+ taskHash = reader.readInt("taskHash");
if (!reader.isLastRead())
return false;
@@ -690,6 +711,14 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache
reader.incrementState();
case 20:
+ transBytes = reader.readByteArray("transBytes");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 21:
byte typeOrd;
typeOrd = reader.readByte("type");
@@ -713,7 +742,7 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 21;
+ return 22;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/QueryCursorEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/QueryCursorEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/QueryCursorEx.java
index bf1d4ea..5e19b99 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/QueryCursorEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/QueryCursorEx.java
@@ -19,6 +19,9 @@ package org.apache.ignite.internal.processors.cache.query;
import org.apache.ignite.*;
import org.apache.ignite.cache.query.*;
+import org.apache.ignite.internal.processors.query.*;
+
+import java.util.*;
/**
* Extended query cursor interface allowing for "getAll" to output data into destination other than Collection.
@@ -32,6 +35,11 @@ public interface QueryCursorEx<T> extends QueryCursor<T> {
public void getAll(Consumer<T> c) throws IgniteCheckedException;
/**
+ * @return Query metadata.
+ */
+ public List<GridQueryFieldMetadata> fieldsMeta();
+
+ /**
* Query value consumer.
*/
public static interface Consumer<T> {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
index f516968..f74fe95 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
@@ -114,7 +114,7 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite
}
CacheQuery qry = new GridCacheQueryAdapter<>(ctx, SET, null, null,
- new GridSetQueryPredicate<>(id, collocated), false, false);
+ new GridSetQueryPredicate<>(id, collocated), null, false, false);
Collection<ClusterNode> nodes = dataNodes(ctx.affinity().affinityTopologyVersion());
@@ -345,7 +345,7 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite
private GridCloseableIterator<T> iterator0() {
try {
CacheQuery qry = new GridCacheQueryAdapter<>(ctx, SET, null, null,
- new GridSetQueryPredicate<>(id, collocated), false, false);
+ new GridSetQueryPredicate<>(id, collocated), null, false, false);
Collection<ClusterNode> nodes = dataNodes(ctx.affinity().affinityTopologyVersion());
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
index 0bb820d..7fcc284 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
@@ -58,9 +58,11 @@ public interface GridQueryIndexing {
*
* @param cctx Cache context.
* @param qry Query.
+ * @param keepCacheObjects If {@code true}, cache objects representation will be preserved.
* @return Cursor.
*/
- public QueryCursor<List<?>> queryTwoStep(GridCacheContext<?,?> cctx, GridCacheTwoStepQuery qry);
+ public Iterable<List<?>> queryTwoStep(GridCacheContext<?,?> cctx, GridCacheTwoStepQuery qry,
+ boolean keepCacheObjects);
/**
* Parses SQL query into two step query and executes it.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index ed8e1e2..e187713 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -539,16 +539,19 @@ public class GridQueryProcessor extends GridProcessorAdapter {
* @param qry Query.
* @return Cursor.
*/
- public QueryCursor<List<?>> queryTwoStep(String space, GridCacheTwoStepQuery qry) {
+ public Iterable<List<?>> queryTwoStep(String space, GridCacheTwoStepQuery qry) {
checkxEnabled();
if (!busyLock.enterBusy())
throw new IllegalStateException("Failed to execute query (grid is stopping).");
try {
+ GridCacheContext<Object, Object> cacheCtx = ctx.cache().internalCache(space).context();
+
return idx.queryTwoStep(
- ctx.cache().internalCache(space).context(),
- qry);
+ cacheCtx,
+ qry,
+ cacheCtx.keepPortable());
}
finally {
busyLock.leaveBusy();
@@ -715,12 +718,15 @@ public class GridQueryProcessor extends GridProcessorAdapter {
String sql = qry.getSql();
Object[] args = qry.getArgs();
- GridQueryFieldsResult res = idx.queryFields(space, sql, F.asList(args), idx.backupFilter());
+ final GridQueryFieldsResult res = idx.queryFields(space, sql, F.asList(args), idx.backupFilter());
sendQueryExecutedEvent(sql, args);
- QueryCursorImpl<List<?>> cursor = new QueryCursorImpl<>(
- new GridQueryCacheObjectsIterator(res.iterator(), cctx, cctx.keepPortable()));
+ QueryCursorImpl<List<?>> cursor = new QueryCursorImpl<>(new Iterable<List<?>>() {
+ @Override public Iterator<List<?>> iterator() {
+ return new GridQueryCacheObjectsIterator(res.iterator(), cctx, cctx.keepPortable());
+ }
+ });
cursor.fieldsMeta(res.metaData());
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index 64eb1c1..bb451c7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@ -934,7 +934,7 @@ public class GridServiceProcessor extends GridProcessorAdapter {
GridCacheQueryManager qryMgr = cache.context().queries();
- CacheQuery<Map.Entry<Object, Object>> qry = qryMgr.createScanQuery(p, false);
+ CacheQuery<Map.Entry<Object, Object>> qry = qryMgr.createScanQuery(p, null, false);
qry.keepAll(false);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/main/java/org/apache/ignite/internal/util/GridJavaProcess.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridJavaProcess.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridJavaProcess.java
index 42fe089..4946eb2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridJavaProcess.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridJavaProcess.java
@@ -138,7 +138,7 @@ public final class GridJavaProcess {
procCommands.add(javaBin);
procCommands.addAll(jvmArgs == null ? U.jvmArgs() : jvmArgs);
- if (!jvmArgs.contains("-cp") && !jvmArgs.contains("-classpath")) {
+ if (jvmArgs == null || (!jvmArgs.contains("-cp") && !jvmArgs.contains("-classpath"))) {
String classpath = System.getProperty("java.class.path");
String sfcp = System.getProperty("surefire.test.class.path");
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index 0932212..9016b10 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -9025,11 +9025,11 @@ public abstract class IgniteUtils {
hasShmem = false;
else {
try {
- IpcSharedMemoryNativeLoader.load();
+ IpcSharedMemoryNativeLoader.load(null);
hasShmem = true;
}
- catch (IgniteCheckedException e) {
+ catch (IgniteCheckedException ignore) {
hasShmem = false;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryClientEndpoint.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryClientEndpoint.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryClientEndpoint.java
index 27a234f..c935c4a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryClientEndpoint.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryClientEndpoint.java
@@ -112,7 +112,7 @@ public class IpcSharedMemoryClientEndpoint implements IpcEndpoint {
boolean clear = true;
try {
- IpcSharedMemoryNativeLoader.load();
+ IpcSharedMemoryNativeLoader.load(log);
sock.connect(new InetSocketAddress("127.0.0.1", port), timeout);