You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/07/24 08:58:09 UTC
[01/14] ignite git commit: IGNITE-4728 Web Console: Saved last
succeeded state and redirect to it on reload.
Repository: ignite
Updated Branches:
refs/heads/ignite-5569-debug b8fee2ad9 -> 4a86a24cf
IGNITE-4728 Web Console: Saved last succeeded state and redirect to it on reload.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/70d0f991
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/70d0f991
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/70d0f991
Branch: refs/heads/ignite-5569-debug
Commit: 70d0f9918c708cb117e69163cc7b7c119c9a693c
Parents: 23f26af
Author: Dmitriy Shabalin <ds...@gridgain.com>
Authored: Thu Jul 20 15:08:20 2017 +0700
Committer: Andrey Novikov <an...@gridgain.com>
Committed: Thu Jul 20 15:08:20 2017 +0700
----------------------------------------------------------------------
modules/web-console/frontend/app/app.js | 14 ++++++++++++++
.../frontend/app/modules/states/errors.state.js | 6 ++++--
.../frontend/app/modules/states/signin.state.js | 10 +++++++++-
3 files changed, 27 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/70d0f991/modules/web-console/frontend/app/app.js
----------------------------------------------------------------------
diff --git a/modules/web-console/frontend/app/app.js b/modules/web-console/frontend/app/app.js
index c707810..dc5c6e9 100644
--- a/modules/web-console/frontend/app/app.js
+++ b/modules/web-console/frontend/app/app.js
@@ -289,6 +289,20 @@ angular
$root.$on('$stateChangeStart', () => {
_.forEach(angular.element('.modal'), (m) => angular.element(m).scope().$hide());
});
+
+ if (!$root.IgniteDemoMode) {
+ $root.$on('$stateChangeSuccess', (event, {name, unsaved}, params) => {
+ try {
+ if (unsaved)
+ localStorage.removeItem('lastStateChangeSuccess');
+ else
+ localStorage.setItem('lastStateChangeSuccess', JSON.stringify({name, params}));
+ }
+ catch (ignored) {
+ // No-op.
+ }
+ });
+ }
}])
.run(['$rootScope', '$http', '$state', 'IgniteMessages', 'User', 'IgniteNotebookData',
($root, $http, $state, Messages, User, Notebook) => { // eslint-disable-line no-shadow
http://git-wip-us.apache.org/repos/asf/ignite/blob/70d0f991/modules/web-console/frontend/app/modules/states/errors.state.js
----------------------------------------------------------------------
diff --git a/modules/web-console/frontend/app/modules/states/errors.state.js b/modules/web-console/frontend/app/modules/states/errors.state.js
index e816ff8..e3d4d41 100644
--- a/modules/web-console/frontend/app/modules/states/errors.state.js
+++ b/modules/web-console/frontend/app/modules/states/errors.state.js
@@ -31,13 +31,15 @@ angular
templateUrl: templateNotFoundPage,
metaTags: {
title: 'Page not found'
- }
+ },
+ unsaved: true
})
.state('403', {
url: '/403',
templateUrl: templateNotAuthorizedPage,
metaTags: {
title: 'Not authorized'
- }
+ },
+ unsaved: true
});
}]);
http://git-wip-us.apache.org/repos/asf/ignite/blob/70d0f991/modules/web-console/frontend/app/modules/states/signin.state.js
----------------------------------------------------------------------
diff --git a/modules/web-console/frontend/app/modules/states/signin.state.js b/modules/web-console/frontend/app/modules/states/signin.state.js
index 5155bde..b7be51d 100644
--- a/modules/web-console/frontend/app/modules/states/signin.state.js
+++ b/modules/web-console/frontend/app/modules/states/signin.state.js
@@ -33,7 +33,15 @@ angular
resolve: {
user: ['$state', 'User', ($state, User) => {
return User.read()
- .then(() => $state.go('base.configuration.tabs'))
+ .then(() => {
+ try {
+ const {name, params} = JSON.parse(localStorage.getItem('lastStateChangeSuccess'));
+
+ $state.go(name, params);
+ } catch (ignored) {
+ $state.go('base.configuration.tabs');
+ }
+ })
.catch(() => {});
}]
},
[10/14] ignite git commit: IGNITE-5772 - Fixed race between WAL
segment rollover and a concurrent log. Closes #2313
Posted by sb...@apache.org.
IGNITE-5772 - Fixed race between WAL segment rollover and a concurrent log. Closes #2313
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6de0571c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6de0571c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6de0571c
Branch: refs/heads/ignite-5569-debug
Commit: 6de0571c21ffdb77af7bb1d18e9659126d7f321b
Parents: 199b954
Author: Ilya Lantukh <il...@gridgain.com>
Authored: Fri Jul 21 16:35:43 2017 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Fri Jul 21 16:35:43 2017 +0300
----------------------------------------------------------------------
.../wal/FileWriteAheadLogManager.java | 93 +++++++++++++-------
1 file changed, 61 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/6de0571c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
index 897f903..b655ddf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
@@ -319,7 +319,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
try {
if (mode == WALMode.BACKGROUND) {
if (currHnd != null)
- currHnd.flush((FileWALPointer)null);
+ currHnd.flush((FileWALPointer)null, true);
}
if (currHnd != null)
@@ -526,7 +526,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
return;
if (mode == WALMode.LOG_ONLY || forceFlush) {
- cur.flushOrWait(filePtr);
+ cur.flushOrWait(filePtr, false);
return;
}
@@ -535,7 +535,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
if (filePtr != null && !cur.needFsync(filePtr))
return;
- cur.fsync(filePtr);
+ cur.fsync(filePtr, false);
}
/** {@inheritDoc} */
@@ -1700,12 +1700,29 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
this.maxSegmentSize = maxSegmentSize;
this.serializer = serializer;
- head.set(new FakeRecord(new FileWALPointer(idx, (int)pos, 0)));
+ head.set(new FakeRecord(new FileWALPointer(idx, (int)pos, 0), false));
written = pos;
lastFsyncPos = pos;
}
/**
+ * Checks if current head is a close fake record and returns {@code true} if so.
+ *
+ * @return {@code true} if current head is close record.
+ */
+ private boolean stopped() {
+ return stopped(head.get());
+ }
+
+ /**
+ * @param record Record to check.
+ * @return {@code true} if the record is fake close record.
+ */
+ private boolean stopped(WALRecord record) {
+ return record instanceof FakeRecord && ((FakeRecord)record).stop;
+ }
+
+ /**
* @param rec Record to be added to record chain as new {@link #head}
* @return Pointer or null if roll over to next segment is required or already started by other thread.
* @throws StorageException If failed.
@@ -1721,9 +1738,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
long nextPos = nextPosition(h);
- // It is important that we read `stop` after `head` in this loop for correct close,
- // because otherwise we will have a race on the last flush in close.
- if (nextPos + rec.size() >= maxSegmentSize || stop.get()) {
+ if (nextPos + rec.size() >= maxSegmentSize || stopped(h)) {
// Can not write to this segment, need to switch to the next one.
return null;
}
@@ -1731,7 +1746,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
int newChainSize = h.chainSize() + rec.size();
if (newChainSize > tlbSize && !flushed) {
- boolean res = h.previous() == null || flush(h);
+ boolean res = h.previous() == null || flush(h, false);
if (rec.size() > tlbSize)
flushed = res;
@@ -1770,7 +1785,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
* @param ptr Pointer.
* @throws IgniteCheckedException If failed.
*/
- private void flushOrWait(FileWALPointer ptr) throws IgniteCheckedException {
+ private void flushOrWait(FileWALPointer ptr, boolean stop) throws IgniteCheckedException {
long expWritten;
if (ptr != null) {
@@ -1783,7 +1798,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
else // We read head position before the flush because otherwise we can get wrong position.
expWritten = recordOffset(head.get());
- if (flush(ptr))
+ if (flush(ptr, stop))
return;
// Spin-wait for a while before acquiring the lock.
@@ -1810,18 +1825,20 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
* @throws IgniteCheckedException If failed.
* @throws StorageException If failed.
*/
- private boolean flush(FileWALPointer ptr) throws IgniteCheckedException, StorageException {
+ private boolean flush(FileWALPointer ptr, boolean stop) throws IgniteCheckedException, StorageException {
if (ptr == null) { // Unconditional flush.
for (; ; ) {
WALRecord expHead = head.get();
if (expHead.previous() == null) {
- assert expHead instanceof FakeRecord;
+ FakeRecord frHead = (FakeRecord)expHead;
- return false;
+ if (frHead.stop == stop || frHead.stop ||
+ head.compareAndSet(expHead, new FakeRecord(frHead.position(), stop)))
+ return false;
}
- if (flush(expHead))
+ if (flush(expHead, stop))
return true;
}
}
@@ -1835,7 +1852,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
if (chainBeginPosition(h) > ptr.fileOffset())
return false;
- if (flush(h))
+ if (flush(h, stop))
return true; // We are lucky.
}
}
@@ -1853,17 +1870,18 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
* @throws IgniteCheckedException If failed.
* @throws StorageException If failed.
*/
- private boolean flush(WALRecord expHead) throws StorageException, IgniteCheckedException {
+ private boolean flush(WALRecord expHead, boolean stop) throws StorageException, IgniteCheckedException {
if (expHead.previous() == null) {
- assert expHead instanceof FakeRecord;
+ FakeRecord frHead = (FakeRecord)expHead;
- return false;
+ if (stop == frHead.stop)
+ return false;
}
// Fail-fast before CAS.
checkEnvironment();
- if (!head.compareAndSet(expHead, new FakeRecord(new FileWALPointer(idx, (int)nextPosition(expHead), 0))))
+ if (!head.compareAndSet(expHead, new FakeRecord(new FileWALPointer(idx, (int)nextPosition(expHead), 0), stop)))
return false;
// At this point we grabbed the piece of WAL chain.
@@ -1976,7 +1994,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
* @param ptr Pointer to sync.
* @throws StorageException If failed.
*/
- private void fsync(FileWALPointer ptr) throws StorageException, IgniteCheckedException {
+ private void fsync(FileWALPointer ptr, boolean stop) throws StorageException, IgniteCheckedException {
lock.lock();
try {
@@ -1984,7 +2002,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
if (!needFsync(ptr))
return;
- if (fsyncDelay > 0 && !stop.get()) {
+ if (fsyncDelay > 0 && !stopped()) {
// Delay fsync to collect as many updates as possible: trade latency for throughput.
U.await(fsync, fsyncDelay, TimeUnit.NANOSECONDS);
@@ -1993,7 +2011,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
}
}
- flushOrWait(ptr);
+ flushOrWait(ptr, stop);
if (lastFsyncPos != written) {
assert lastFsyncPos < written; // Fsync position must be behind.
@@ -2031,13 +2049,14 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
* @throws StorageException If failed.
*/
private boolean close(boolean rollOver) throws IgniteCheckedException, StorageException {
- if (stop.compareAndSet(false, true)) {
- // Here we can be sure that no other records will be added and this fsync will be the last.
- if (mode == WALMode.DEFAULT)
- fsync(null);
- else
- flushOrWait(null);
+ if (mode == WALMode.DEFAULT)
+ fsync(null, true);
+ else
+ flushOrWait(null, true);
+
+ assert stopped() : "Segment is not closed after close flush: " + head.get();
+ if (stop.compareAndSet(false, true)) {
try {
int switchSegmentRecSize = RecordV1Serializer.REC_TYPE_SIZE + RecordV1Serializer.FILE_WAL_POINTER_SIZE;
@@ -2068,8 +2087,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
return true;
}
-
- return false;
+ else
+ return false;
}
/**
@@ -2271,17 +2290,27 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
* Fake record is allowed to have no previous record.
*/
private static final class FakeRecord extends WALRecord {
+ /** */
+ private final boolean stop;
+
/**
* @param pos Position.
*/
- FakeRecord(FileWALPointer pos) {
+ FakeRecord(FileWALPointer pos, boolean stop) {
position(pos);
+
+ this.stop = stop;
}
/** {@inheritDoc} */
@Override public RecordType type() {
return null;
}
+
+ /** {@inheritDoc} */
+ @Override public FileWALPointer position() {
+ return (FileWALPointer) super.position();
+ }
}
/**
@@ -2492,7 +2521,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
private void doFlush() {
final FileWriteHandle hnd = currentHandle();
try {
- hnd.flush(hnd.head.get());
+ hnd.flush(hnd.head.get(), false);
}
catch (Exception e) {
U.warn(log, "Failed to flush WAL record queue", e);
[05/14] ignite git commit: Assembly procedure fix
Posted by sb...@apache.org.
Assembly procedure fix
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/db43b0c4
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/db43b0c4
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/db43b0c4
Branch: refs/heads/ignite-5569-debug
Commit: db43b0c40161f8756ab5cad0c1b4b404f0743d8f
Parents: bd7a08e
Author: Anton Vinogradov <av...@apache.org>
Authored: Thu Jul 20 14:58:09 2017 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Thu Jul 20 14:58:09 2017 +0300
----------------------------------------------------------------------
DEVNOTES.txt | 18 +++++++++---------
1 file changed, 9 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/db43b0c4/DEVNOTES.txt
----------------------------------------------------------------------
diff --git a/DEVNOTES.txt b/DEVNOTES.txt
index 8a689b8..340153e 100644
--- a/DEVNOTES.txt
+++ b/DEVNOTES.txt
@@ -2,14 +2,14 @@ Ignite Fabric Maven Build Instructions
======================================
1) Optional: build Apache Ignite.NET as described at modules/platforms/dotnet/DEVNOTES.txt.
-2) Compile and package:
+2) Compile and install:
- mvn clean package -Pall-java,all-scala,licenses -DskipTests
+ mvn clean install -Pall-java,all-scala,licenses -DskipTests
or if you have built Apache Ignite.NET on the first step use following command:
(Note that 'doxygen' should be installed before running this command.)
- mvn clean package -Pall-java,all-scala,licenses -DskipTests -DclientDocs
+ mvn clean install -Pall-java,all-scala,licenses -DskipTests -DclientDocs
3) Javadoc generation (optional):
@@ -25,14 +25,14 @@ Ignite Fabric with LGPL Maven Build Instructions
======================================
1) Optional: build Apache Ignite.NET as described at modules/platforms/dotnet/DEVNOTES.txt.
-2) Compile and package:
+2) Compile and install:
- mvn clean package -Pall-java,all-scala,licenses -DskipTests
+ mvn clean install -Pall-java,all-scala,licenses -DskipTests
or if you have built Apache Ignite.NET on the first step use following command:
(Note that 'doxygen' should be installed before running this command.)
- mvn clean package -Pall-java,all-scala,licenses -DskipTests -DclientDocs
+ mvn clean install -Pall-java,all-scala,licenses -DskipTests -DclientDocs
3) Javadoc generation with LGPL (optional):
@@ -46,15 +46,15 @@ Look for apache-ignite-fabric-lgpl-<version>-bin.zip in ./target/bin directory.
Ignite Hadoop Accelerator Maven Build Instructions
============================================
-1) Compile and package:
+1) Compile and install:
- mvn clean package -Pall-java,all-scala,licenses -DskipTests
+ mvn clean install -Pall-java,all-scala,licenses -DskipTests
Use 'hadoop.version' parameter to build Ignite against a specific Hadoop version.
Use 'spark.version' parameter to build ignite-spark module for a specific Spark version. Version should be >= 2.0.0.
For example:
- mvn clean package -Pall-java,all-scala,licenses -DskipTests -Dhadoop.version=2.4.2 -Dspark.version=2.1.1
+ mvn clean install -Pall-java,all-scala,licenses -DskipTests -Dhadoop.version=2.4.2 -Dspark.version=2.1.1
2) Assembly Hadoop Accelerator:
mvn initialize -Prelease -Dignite.edition=hadoop
[13/14] ignite git commit: Test for cache partitions state,
fix for client cache start.
Posted by sb...@apache.org.
Test for cache partitions state, fix for client cache start.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/aeb9336b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/aeb9336b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/aeb9336b
Branch: refs/heads/ignite-5569-debug
Commit: aeb9336b3b161ddfff73f17e41cd453409b84a16
Parents: ca496f6
Author: sboikov <sb...@gridgain.com>
Authored: Mon Jul 24 11:47:16 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Jul 24 11:47:16 2017 +0300
----------------------------------------------------------------------
.../cache/CacheAffinitySharedManager.java | 51 ++-
.../dht/GridClientPartitionTopology.java | 7 +-
.../dht/GridDhtPartitionTopology.java | 12 +-
.../dht/GridDhtPartitionTopologyImpl.java | 45 +-
.../GridDhtPartitionsExchangeFuture.java | 120 +++---
.../GridCacheDatabaseSharedManager.java | 6 +-
.../CacheLateAffinityAssignmentTest.java | 36 +-
.../distributed/CachePartitionStateTest.java | 410 +++++++++++++++++++
.../TestCacheNodeExcludingFilter.java | 53 +++
.../db/IgnitePdsCacheRestoreTest.java | 208 ++++++++++
.../testsuites/IgniteCacheTestSuite6.java | 38 ++
.../ignite/testsuites/IgnitePdsTestSuite.java | 3 +
12 files changed, 863 insertions(+), 126 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/aeb9336b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index 79ab183..f519b4e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -517,6 +517,16 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
}
}
+ for (DynamicCacheDescriptor desc : startDescs) {
+ if (desc.cacheConfiguration().getCacheMode() != LOCAL) {
+ CacheGroupContext grp = cctx.cache().cacheGroup(desc.groupId());
+
+ assert grp != null;
+
+ grp.topology().onExchangeDone(grp.affinity().cachedAffinity(topVer), true);
+ }
+ }
+
cctx.cache().initCacheProxies(topVer, null);
cctx.cache().completeClientCacheChangeFuture(msg.requestId(), null);
@@ -1299,6 +1309,19 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
}
/**
+ * @param grpId Group ID.
+ * @return Group name for debug purpose.
+ */
+ private String debugGroupName(int grpId) {
+ CacheGroupDescriptor desc = caches.group(grpId);
+
+ if (desc != null)
+ return desc.cacheOrGroupName();
+ else
+ return "Unknown group: " + grpId;
+ }
+
+ /**
* @param fut Exchange future.
* @throws IgniteCheckedException If failed.
*/
@@ -1396,19 +1419,31 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
* Called on exchange initiated by server node leave.
*
* @param fut Exchange future.
+ * @param crd Coordinator flag.
* @throws IgniteCheckedException If failed.
* @return {@code True} if affinity should be assigned by coordinator.
*/
- public boolean onServerLeft(final GridDhtPartitionsExchangeFuture fut) throws IgniteCheckedException {
+ public boolean onServerLeft(final GridDhtPartitionsExchangeFuture fut, boolean crd) throws IgniteCheckedException {
ClusterNode leftNode = fut.discoveryEvent().eventNode();
assert !leftNode.isClient() : leftNode;
- for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
- if (grp.isLocal())
- continue;
+ if (crd) {
+ // Need initialize CacheGroupHolders if this node become coordinator on this exchange.
+ forAllRegisteredCacheGroups(new IgniteInClosureX<CacheGroupDescriptor>() {
+ @Override public void applyx(CacheGroupDescriptor desc) throws IgniteCheckedException {
+ CacheGroupHolder cache = groupHolder(fut.topologyVersion(), desc);
- grp.affinity().calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache());
+ cache.aff.calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache());
+ }
+ });
+ }
+ else {
+ forAllCacheGroups(false, new IgniteInClosureX<GridAffinityAssignmentCache>() {
+ @Override public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException {
+ aff.calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache());
+ }
+ });
}
synchronized (mux) {
@@ -1433,12 +1468,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
@Override public void applyx(CacheGroupDescriptor desc) throws IgniteCheckedException {
CacheGroupHolder grpHolder = grpHolders.get(desc.groupId());
- if (grpHolder != null) {
- if (grpHolder.client()) // Affinity for non-client holders calculated in {@link #onServerLeft}.
- grpHolder.affinity().calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache());
-
+ if (grpHolder != null)
return;
- }
// Need initialize holders and affinity if this node became coordinator during this exchange.
final Integer grpId = desc.groupId();
http://git-wip-us.apache.org/repos/asf/ignite/blob/aeb9336b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index f4ed517..232ce38 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -367,6 +367,11 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
}
/** {@inheritDoc} */
+ @Override public GridDhtLocalPartition forceCreatePartition(int p) throws IgniteCheckedException {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
@Override public GridDhtLocalPartition localPartition(int p) {
return localPartition(p, AffinityTopologyVersion.NONE, false);
}
@@ -830,7 +835,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
}
/** {@inheritDoc} */
- @Override public void onExchangeDone(AffinityAssignment assignment) {
+ @Override public void onExchangeDone(AffinityAssignment assignment, boolean updateRebalanceVer) {
// No-op.
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/aeb9336b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
index 5f76d12..d9e04a6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
@@ -130,6 +130,15 @@ public interface GridDhtPartitionTopology {
throws GridDhtInvalidPartitionException;
/**
+ * Unconditionally creates partition during restore of persisted partition state.
+ *
+ * @param p Partition ID.
+ * @return Partition.
+ * @throws IgniteCheckedException If failed.
+ */
+ public GridDhtLocalPartition forceCreatePartition(int p) throws IgniteCheckedException;
+
+ /**
* @param topVer Topology version at the time of creation.
* @param p Partition ID.
* @param create If {@code true}, then partition will be created if it's not there.
@@ -331,6 +340,7 @@ public interface GridDhtPartitionTopology {
* Callback on exchange done.
*
* @param assignment New affinity assignment.
+ * @param updateRebalanceVer {@code True} if need check rebalance state.
*/
- public void onExchangeDone(AffinityAssignment assignment);
+ public void onExchangeDone(AffinityAssignment assignment, boolean updateRebalanceVer);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/aeb9336b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 601da1b..5ef499c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -135,9 +135,6 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
/** */
private volatile AffinityTopologyVersion rebalancedTopVer = AffinityTopologyVersion.NONE;
- /** */
- private volatile boolean treatAllPartAsLoc;
-
/**
* @param ctx Cache shared context.
* @param grp Cache group.
@@ -421,14 +418,6 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
/** {@inheritDoc} */
@Override public void beforeExchange(GridDhtPartitionsExchangeFuture exchFut, boolean affReady)
throws IgniteCheckedException {
- DiscoveryEvent discoEvt = exchFut.discoveryEvent();
-
- treatAllPartAsLoc = exchFut.activateCluster()
- || (discoEvt.type() == EventType.EVT_NODE_JOINED
- && discoEvt.eventNode().isLocal()
- && !ctx.kernalContext().clientNode()
- );
-
ClusterNode loc = ctx.localNode();
ctx.database().checkpointReadLock();
@@ -540,8 +529,6 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
/** {@inheritDoc} */
@Override public boolean afterExchange(GridDhtPartitionsExchangeFuture exchFut) throws IgniteCheckedException {
- treatAllPartAsLoc = false;
-
boolean changed = false;
int num = grp.affinity().partitions();
@@ -692,6 +679,29 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
return loc;
}
+ /** {@inheritDoc} */
+ @Override public GridDhtLocalPartition forceCreatePartition(int p) throws IgniteCheckedException {
+ lock.writeLock().lock();
+
+ try {
+ GridDhtLocalPartition part = locParts.get(p);
+
+ if (part != null)
+ return part;
+
+ part = new GridDhtLocalPartition(ctx, grp, p);
+
+ locParts.set(p, part);
+
+ ctx.pageStore().onPartitionCreated(grp.groupId(), p);
+
+ return part;
+ }
+ finally {
+ lock.writeLock().unlock();
+ }
+ }
+
/**
* @param p Partition number.
* @param topVer Topology version.
@@ -731,7 +741,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
if (loc != null && state == EVICTED) {
locParts.set(p, loc = null);
- if (!treatAllPartAsLoc && !belongs)
+ if (!belongs)
throw new GridDhtInvalidPartitionException(p, "Adding entry to evicted partition " +
"(often may be caused by inconsistent 'key.hashCode()' implementation) " +
"[part=" + p + ", topVer=" + topVer + ", this.topVer=" + this.topVer + ']');
@@ -741,7 +751,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
"[part=" + p + ", shouldBeMoving=" + loc.reload() + "]");
if (loc == null) {
- if (!treatAllPartAsLoc && !belongs)
+ if (!belongs)
throw new GridDhtInvalidPartitionException(p, "Creating partition which does not belong to " +
"local node (often may be caused by inconsistent 'key.hashCode()' implementation) " +
"[part=" + p + ", topVer=" + topVer + ", this.topVer=" + this.topVer + ']');
@@ -1499,12 +1509,15 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
}
/** {@inheritDoc} */
- @Override public void onExchangeDone(AffinityAssignment assignment) {
+ @Override public void onExchangeDone(AffinityAssignment assignment, boolean updateRebalanceVer) {
lock.writeLock().lock();
try {
if (assignment.topologyVersion().compareTo(diffFromAffinityVer) >= 0)
rebuildDiff(assignment);
+
+ if (updateRebalanceVer)
+ updateRebalanceVersion(assignment.assignment());
}
finally {
lock.writeLock().unlock();
http://git-wip-us.apache.org/repos/asf/ignite/blob/aeb9336b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index c4a4f83..cdb4bb7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -193,9 +193,6 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
/** */
private CacheAffinityChangeMessage affChangeMsg;
- /** */
- private boolean clientOnlyExchange;
-
/** Init timestamp. Used to track the amount of time spent to complete the future. */
private long initTs;
@@ -485,26 +482,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
cctx.affinity().initStartedCaches(crdNode, this, receivedCaches);
}
- else {
- cctx.activate();
-
- List<T2<DynamicCacheDescriptor, NearCacheConfiguration>> caches =
- cctx.cache().cachesToStartOnLocalJoin();
-
- if (cctx.database().persistenceEnabled() &&
- !cctx.kernalContext().clientNode()) {
- List<DynamicCacheDescriptor> startDescs = new ArrayList<>();
-
- if (caches != null) {
- for (T2<DynamicCacheDescriptor, NearCacheConfiguration> c : caches)
- startDescs.add(c.get1());
- }
-
- cctx.database().readCheckpointAndRestoreMemory(startDescs);
- }
-
- cctx.cache().startCachesOnLocalJoin(caches, topVer);
- }
+ else
+ initCachesOnLocalJoin();
}
exchange = CU.clientNode(discoEvt.eventNode()) ?
@@ -571,6 +550,29 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
/**
* @throws IgniteCheckedException If failed.
*/
+ private void initCachesOnLocalJoin() throws IgniteCheckedException {
+ cctx.activate();
+
+ List<T2<DynamicCacheDescriptor, NearCacheConfiguration>> caches =
+ cctx.cache().cachesToStartOnLocalJoin();
+
+ if (cctx.database().persistenceEnabled() && !cctx.kernalContext().clientNode()) {
+ List<DynamicCacheDescriptor> startDescs = new ArrayList<>();
+
+ if (caches != null) {
+ for (T2<DynamicCacheDescriptor, NearCacheConfiguration> c : caches)
+ startDescs.add(c.get1());
+ }
+
+ cctx.database().readCheckpointAndRestoreMemory(startDescs);
+ }
+
+ cctx.cache().startCachesOnLocalJoin(caches, topologyVersion());
+ }
+
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
private void initTopologies() throws IgniteCheckedException {
cctx.database().checkpointReadLock();
@@ -776,7 +778,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
warnNoAffinityNodes();
- centralizedAff = cctx.affinity().onServerLeft(this);
+ centralizedAff = cctx.affinity().onServerLeft(this, crd);
}
else
cctx.affinity().onServerJoin(this, crd);
@@ -788,40 +790,15 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
* @throws IgniteCheckedException If failed.
*/
private void clientOnlyExchange() throws IgniteCheckedException {
- clientOnlyExchange = true;
-
if (crd != null) {
- if (crd.isLocal()) {
- for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
- boolean updateTop = !grp.isLocal() &&
- exchId.topologyVersion().equals(grp.localStartVersion());
-
- if (updateTop) {
- for (GridClientPartitionTopology top : cctx.exchange().clientTopologies()) {
- if (top.groupId() == grp.groupId()) {
- GridDhtPartitionFullMap fullMap = top.partitionMap(true);
-
- assert fullMap != null;
-
- grp.topology().update(topologyVersion(),
- fullMap,
- top.updateCounters(false),
- Collections.<Integer>emptySet());
+ assert !crd.isLocal() : crd;
- break;
- }
- }
- }
- }
- }
- else {
- if (!centralizedAff)
- sendLocalPartitions(crd);
+ if (!centralizedAff)
+ sendLocalPartitions(crd);
- initDone();
+ initDone();
- return;
- }
+ return;
}
else {
if (centralizedAff) { // Last server node failed.
@@ -896,8 +873,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
try {
long start = U.currentTimeMillis();
- IgniteInternalFuture fut = cctx.snapshot()
- .tryStartLocalSnapshotOperation(discoEvt);
+ IgniteInternalFuture fut = cctx.snapshot().tryStartLocalSnapshotOperation(discoEvt);
if (fut != null) {
fut.get();
@@ -1122,6 +1098,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
private void sendLocalPartitions(ClusterNode node) throws IgniteCheckedException {
assert node != null;
+ GridDhtPartitionsSingleMessage msg;
+
// Reset lost partition before send local partition to coordinator.
if (exchActions != null) {
Set<String> caches = exchActions.cachesToResetLostPartitions();
@@ -1130,22 +1108,32 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
resetLostPartitions(caches);
}
- GridDhtPartitionsSingleMessage m = cctx.exchange().createPartitionsSingleMessage(
- node, exchangeId(), clientOnlyExchange, true);
+ if (cctx.kernalContext().clientNode()) {
+ msg = new GridDhtPartitionsSingleMessage(exchangeId(),
+ true,
+ null,
+ true);
+ }
+ else {
+ msg = cctx.exchange().createPartitionsSingleMessage(node,
+ exchangeId(),
+ false,
+ true);
+ }
Map<Integer, Map<Integer, Long>> partHistReserved0 = partHistReserved;
if (partHistReserved0 != null)
- m.partitionHistoryCounters(partHistReserved0);
+ msg.partitionHistoryCounters(partHistReserved0);
if (stateChangeExchange() && changeGlobalStateE != null)
- m.setError(changeGlobalStateE);
+ msg.setError(changeGlobalStateE);
if (log.isDebugEnabled())
- log.debug("Sending local partitions [nodeId=" + node.id() + ", exchId=" + exchId + ", msg=" + m + ']');
+ log.debug("Sending local partitions [nodeId=" + node.id() + ", exchId=" + exchId + ", msg=" + msg + ']');
try {
- cctx.io().send(node, m, SYSTEM_POOL);
+ cctx.io().send(node, msg, SYSTEM_POOL);
}
catch (ClusterTopologyCheckedException ignored) {
if (log.isDebugEnabled())
@@ -1318,7 +1306,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
if (err == null) {
for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
if (!grp.isLocal())
- grp.topology().onExchangeDone(grp.affinity().cachedAffinity(topologyVersion()));
+ grp.topology().onExchangeDone(grp.affinity().cachedAffinity(topologyVersion()), false);
}
}
@@ -1386,10 +1374,12 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
public void onReceive(final ClusterNode node, final GridDhtPartitionsSingleMessage msg) {
assert msg != null;
assert msg.exchangeId().equals(exchId) : msg;
- assert msg.lastVersion() != null : msg;
- if (!msg.client())
+ if (!msg.client()) {
+ assert msg.lastVersion() != null : msg;
+
updateLastVersion(msg.lastVersion());
+ }
if (isDone()) {
if (log.isDebugEnabled())
http://git-wip-us.apache.org/repos/asf/ignite/blob/aeb9336b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index 39038ba..1797d64 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -1560,8 +1560,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
T2<Integer, Long> fromWal = partStates.get(new T2<>(grpId, i));
- GridDhtLocalPartition part = grp.topology()
- .localPartition(i, AffinityTopologyVersion.NONE, true);
+ GridDhtLocalPartition part = grp.topology().forceCreatePartition(i);
assert part != null;
@@ -1621,8 +1620,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
* @param dataEntry Data entry to apply.
*/
private void applyUpdate(GridCacheContext cacheCtx, DataEntry dataEntry) throws IgniteCheckedException {
- GridDhtLocalPartition locPart = cacheCtx.topology()
- .localPartition(dataEntry.partitionId(), AffinityTopologyVersion.NONE, true);
+ GridDhtLocalPartition locPart = cacheCtx.topology().forceCreatePartition(dataEntry.partitionId());
switch (dataEntry.op()) {
case CREATE:
http://git-wip-us.apache.org/repos/asf/ignite/blob/aeb9336b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
index 23043d1..7d8620a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
@@ -331,7 +331,7 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest {
}
};
- cacheNodeFilter = new CacheNodeFilter(F.asList(getTestIgniteInstanceName(0)));
+ cacheNodeFilter = new TestCacheNodeExcludingFilter(F.asList(getTestIgniteInstanceName(0)));
testAffinitySimpleSequentialStart();
@@ -351,7 +351,7 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest {
}
};
- cacheNodeFilter = new CacheNodeFilter(F.asList(getTestIgniteInstanceName(1)));
+ cacheNodeFilter = new TestCacheNodeExcludingFilter(F.asList(getTestIgniteInstanceName(1)));
startServer(0, 1);
@@ -391,7 +391,7 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest {
}
};
- cacheNodeFilter = new CacheNodeFilter(F.asList(getTestIgniteInstanceName(1), getTestIgniteInstanceName(2)));
+ cacheNodeFilter = new TestCacheNodeExcludingFilter(F.asList(getTestIgniteInstanceName(1), getTestIgniteInstanceName(2)));
startServer(0, 1);
startServer(1, 2);
@@ -439,7 +439,7 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest {
}
};
- cacheNodeFilter = new CacheNodeFilter(F.asList(getTestIgniteInstanceName(0)));
+ cacheNodeFilter = new TestCacheNodeExcludingFilter(F.asList(getTestIgniteInstanceName(0)));
Ignite ignite0 = startServer(0, 1);
@@ -467,7 +467,7 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest {
}
};
- cacheNodeFilter = new CacheNodeFilter(F.asList(getTestIgniteInstanceName(0)));
+ cacheNodeFilter = new TestCacheNodeExcludingFilter(F.asList(getTestIgniteInstanceName(0)));
Ignite ignite0 = startServer(0, 1);
@@ -520,7 +520,7 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest {
*/
private void cacheDestroyAndCreate(boolean cacheOnCrd) throws Exception {
if (!cacheOnCrd)
- cacheNodeFilter = new CacheNodeFilter(Collections.singletonList(getTestIgniteInstanceName(0)));
+ cacheNodeFilter = new TestCacheNodeExcludingFilter(Collections.singletonList(getTestIgniteInstanceName(0)));
startServer(0, 1);
@@ -2069,7 +2069,7 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest {
exclude.add("server-" + (srvIdx + rnd.nextInt(10)));
}
- ccfg.setNodeFilter(new CacheNodeFilter(exclude));
+ ccfg.setNodeFilter(new TestCacheNodeExcludingFilter(exclude));
}
ccfg.setName(name);
@@ -2645,28 +2645,6 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest {
/**
*
*/
- static class CacheNodeFilter implements IgnitePredicate<ClusterNode> {
- /** */
- private Collection<String> excludeNodes;
-
- /**
- * @param excludeNodes Nodes names.
- */
- public CacheNodeFilter(Collection<String> excludeNodes) {
- this.excludeNodes = excludeNodes;
- }
-
- /** {@inheritDoc} */
- @Override public boolean apply(ClusterNode clusterNode) {
- String name = clusterNode.attribute(ATTR_IGNITE_INSTANCE_NAME).toString();
-
- return !excludeNodes.contains(name);
- }
- }
-
- /**
- *
- */
static class TestTcpDiscoverySpi extends TcpDiscoverySpi {
/** */
private boolean blockCustomEvt;
http://git-wip-us.apache.org/repos/asf/ignite/blob/aeb9336b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CachePartitionStateTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CachePartitionStateTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CachePartitionStateTest.java
new file mode 100644
index 0000000..c64ed0b
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CachePartitionStateTest.java
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.EVICTED;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.MOVING;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING;
+
+/**
+ *
+ */
+public class CachePartitionStateTest extends GridCommonAbstractTest {
+ /** */
+ private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ private boolean client;
+
+ /** */
+ private CacheConfiguration ccfg;
+
+ /** {@inheritDoc} */
+ protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+ cfg.setCommunicationSpi(new TestRecordingCommunicationSpi());
+
+ cfg.setClientMode(client);
+
+ if (ccfg != null) {
+ cfg.setCacheConfiguration(ccfg);
+
+ ccfg = null;
+ }
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+
+ super.afterTest();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPartitionState1_1() throws Exception {
+ partitionState1(0, true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPartitionState1_2() throws Exception {
+ partitionState1(1, true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPartitionState1_2_NoCacheOnCoordinator() throws Exception {
+ partitionState1(1, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPartitionState1_3() throws Exception {
+ partitionState1(100, true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPartitionState2_1() throws Exception {
+ partitionState2(0, true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPartitionState2_2() throws Exception {
+ partitionState2(1, true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPartitionState2_2_NoCacheOnCoordinator() throws Exception {
+ partitionState2(1, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPartitionState2_3() throws Exception {
+ partitionState2(100, true);
+ }
+
+ /**
+ * @param backups Number of backups.
+ * @param crdAffNode If {@code false} cache is not created on coordinator.
+ * @throws Exception If failed.
+ */
+ private void partitionState1(int backups, boolean crdAffNode) throws Exception {
+ startGrids(3);
+
+ blockSupplySend(DEFAULT_CACHE_NAME);
+
+ CacheConfiguration ccfg = cacheConfiguration(DEFAULT_CACHE_NAME, backups);
+
+ if (!crdAffNode)
+ ccfg.setNodeFilter(new TestCacheNodeExcludingFilter(getTestIgniteInstanceName(0)));
+
+ ignite(1).createCache(ccfg);
+
+ AffinityAssignment assign0 =
+ grid(1).context().cache().internalCache(DEFAULT_CACHE_NAME).context().affinity().assignment(
+ new AffinityTopologyVersion(3, 1));
+
+ awaitPartitionMapExchange();
+
+ checkPartitionsState(assign0, DEFAULT_CACHE_NAME, OWNING);
+
+ checkRebalance(DEFAULT_CACHE_NAME, true);
+
+ client = true;
+
+ Ignite clientNode = startGrid(4);
+
+ checkPartitionsState(assign0, DEFAULT_CACHE_NAME, OWNING);
+
+ clientNode.cache(DEFAULT_CACHE_NAME);
+
+ checkPartitionsState(assign0, DEFAULT_CACHE_NAME, OWNING);
+
+ checkRebalance(DEFAULT_CACHE_NAME, true);
+
+ client = false;
+
+ startGrid(5);
+
+ checkRebalance(DEFAULT_CACHE_NAME, false);
+
+ for (int i = 0; i < 3; i++)
+ checkNodePartitions(assign0, ignite(i).cluster().localNode(), DEFAULT_CACHE_NAME, OWNING);
+
+ AffinityAssignment assign1 =
+ grid(1).context().cache().internalCache(DEFAULT_CACHE_NAME).context().affinity().assignment(
+ new AffinityTopologyVersion(5, 0));
+
+ checkNodePartitions(assign1, ignite(5).cluster().localNode(), DEFAULT_CACHE_NAME, MOVING);
+
+ stopBlock();
+
+ awaitPartitionMapExchange();
+
+ AffinityAssignment assign2 =
+ grid(1).context().cache().internalCache(DEFAULT_CACHE_NAME).context().affinity().assignment(
+ new AffinityTopologyVersion(5, 1));
+
+ checkPartitionsState(assign2, DEFAULT_CACHE_NAME, OWNING);
+
+ checkRebalance(DEFAULT_CACHE_NAME, true);
+
+ if (!crdAffNode)
+ ignite(0).cache(DEFAULT_CACHE_NAME);
+
+ checkPartitionsState(assign2, DEFAULT_CACHE_NAME, OWNING);
+
+ checkRebalance(DEFAULT_CACHE_NAME, true);
+
+ startGrid(6);
+
+ awaitPartitionMapExchange();
+
+ AffinityAssignment assign3 =
+ grid(1).context().cache().internalCache(DEFAULT_CACHE_NAME).context().affinity().assignment(
+ new AffinityTopologyVersion(6, 1));
+
+ checkPartitionsState(assign3, DEFAULT_CACHE_NAME, OWNING);
+
+ checkRebalance(DEFAULT_CACHE_NAME, true);
+ }
+
+ /**
+ * @param backups Number of backups.
+ * @param crdAffNode If {@code false} cache is not created on coordinator.
+ * @throws Exception If failed.
+ */
+ private void partitionState2(int backups, boolean crdAffNode) throws Exception {
+ startGrids(3);
+
+ blockSupplySend(DEFAULT_CACHE_NAME);
+
+ ccfg = cacheConfiguration(DEFAULT_CACHE_NAME, backups);
+
+ if (!crdAffNode)
+ ccfg.setNodeFilter(new TestCacheNodeExcludingFilter(getTestIgniteInstanceName(0)));
+
+ startGrid(4);
+
+ AffinityAssignment assign0 =
+ grid(1).context().cache().internalCache(DEFAULT_CACHE_NAME).context().affinity().assignment(
+ new AffinityTopologyVersion(4, 0));
+
+ checkPartitionsState(assign0, DEFAULT_CACHE_NAME, OWNING);
+
+ checkRebalance(DEFAULT_CACHE_NAME, true);
+
+ if (!crdAffNode)
+ ignite(0).cache(DEFAULT_CACHE_NAME);
+
+ checkPartitionsState(assign0, DEFAULT_CACHE_NAME, OWNING);
+
+ checkRebalance(DEFAULT_CACHE_NAME, true);
+
+ stopBlock();
+
+ startGrid(5);
+
+ AffinityAssignment assign1 =
+ grid(1).context().cache().internalCache(DEFAULT_CACHE_NAME).context().affinity().assignment(
+ new AffinityTopologyVersion(5, 1));
+
+ awaitPartitionMapExchange();
+
+ checkPartitionsState(assign1, DEFAULT_CACHE_NAME, OWNING);
+
+ checkRebalance(DEFAULT_CACHE_NAME, true);
+ }
+
+ /**
+ * @param assign Assignments.
+ * @param cacheName Cache name.
+ * @param expState Expected state.
+ */
+ private void checkPartitionsState(AffinityAssignment assign, String cacheName, GridDhtPartitionState expState) {
+ for (Ignite node : G.allGrids())
+ checkNodePartitions(assign, node.cluster().localNode(), cacheName, expState);
+ }
+
+ /**
+ * @param assign Assignments.
+ * @param clusterNode Node.
+ * @param cacheName Cache name.
+ * @param expState Expected partitions state.
+ */
+ private void checkNodePartitions(AffinityAssignment assign,
+ ClusterNode clusterNode,
+ String cacheName,
+ GridDhtPartitionState expState)
+ {
+ Affinity<Object> aff = ignite(0).affinity(cacheName);
+
+ Set<Integer> nodeParts = new HashSet<>();
+
+ nodeParts.addAll(assign.primaryPartitions(clusterNode.id()));
+ nodeParts.addAll(assign.backupPartitions(clusterNode.id()));
+
+ log.info("Test state [node=" + clusterNode.id() + ", parts=" + nodeParts.size() + ", state=" + expState + ']');
+
+ if (grid(0).context().discovery().cacheAffinityNode(clusterNode, cacheName))
+ assertFalse(nodeParts.isEmpty());
+
+ boolean check = false;
+
+ for (Ignite node : G.allGrids()) {
+ GridCacheAdapter cache =
+ ((IgniteKernal)node).context().cache().internalCache(cacheName);
+
+ if (cache != null) {
+ check = true;
+
+ GridDhtPartitionTopology top = cache.context().topology();
+
+ GridDhtPartitionMap partsMap = top.partitions(clusterNode.id());
+
+ for (int p = 0; p < aff.partitions(); p++) {
+ if (nodeParts.contains(p)) {
+ assertNotNull(partsMap);
+ assertEquals(expState, partsMap.get(p));
+ }
+ else {
+ if (partsMap != null) {
+ GridDhtPartitionState state = partsMap.get(p);
+
+ assertTrue("Unexpected state: " + state, state == null || state == EVICTED);
+ }
+ }
+ }
+ }
+ else {
+ assertEquals(0, aff.primaryPartitions(((IgniteKernal)node).localNode()).length);
+ assertEquals(0, aff.backupPartitions(((IgniteKernal)node).localNode()).length);
+ }
+ }
+
+ assertTrue(check);
+ }
+
+ /**
+ * @param cacheName Cache name.
+ * @param expDone Expected rebalance finish flag.
+ */
+ private void checkRebalance(String cacheName, boolean expDone) {
+ for (Ignite node : G.allGrids()) {
+ IgniteKernal node0 = (IgniteKernal)node;
+
+ GridCacheAdapter cache = node0.context().cache().internalCache(cacheName);
+
+ AffinityTopologyVersion topVer = node0.context().cache().context().exchange().readyAffinityVersion();
+
+ if (cache != null)
+ assertEquals(expDone, cache.context().topology().rebalanceFinished(topVer));
+ else
+ node0.context().discovery().cacheAffinityNode(node0.localNode(), cacheName);
+ }
+ }
+
+ /**
+ * @param cacheName Cache name.
+ */
+ private void blockSupplySend(String cacheName) {
+ for (Ignite node : G.allGrids())
+ blockSupplySend(TestRecordingCommunicationSpi.spi(node), cacheName);
+ }
+
+ /**
+ * @param spi SPI.
+ * @param cacheName Cache name.
+ */
+ private void blockSupplySend(TestRecordingCommunicationSpi spi, final String cacheName) {
+ final int grpId = CU.cacheId(cacheName);
+
+ spi.blockMessages(new IgniteBiPredicate<ClusterNode, Message>() {
+ @Override public boolean apply(ClusterNode node, Message msg) {
+ return msg.getClass().equals(GridDhtPartitionSupplyMessage.class) &&
+ ((GridDhtPartitionSupplyMessage)msg).groupId() == grpId;
+ }
+ });
+ }
+
+ /**
+ *
+ */
+ private void stopBlock() {
+ for (Ignite node : G.allGrids())
+ TestRecordingCommunicationSpi.spi(node).stopBlock();
+ }
+
+ /**
+ * @param name Cache name.
+ * @param backups Backups number.
+ * @return Cache configuration.
+ */
+ private CacheConfiguration cacheConfiguration(String name, int backups) {
+ CacheConfiguration ccfg = new CacheConfiguration(name);
+
+ ccfg.setWriteSynchronizationMode(FULL_SYNC);
+ ccfg.setBackups(backups);
+
+ return ccfg;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/aeb9336b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/TestCacheNodeExcludingFilter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/TestCacheNodeExcludingFilter.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/TestCacheNodeExcludingFilter.java
new file mode 100644
index 0000000..a3f7d27
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/TestCacheNodeExcludingFilter.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import java.util.Arrays;
+import java.util.Collection;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.lang.IgnitePredicate;
+
+import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME;
+
+/**
+ *
+ */
+public class TestCacheNodeExcludingFilter implements IgnitePredicate<ClusterNode> {
+ /** */
+ private Collection<String> excludeNodes;
+
+ /**
+ * @param excludeNodes Nodes names.
+ */
+ public TestCacheNodeExcludingFilter(Collection<String> excludeNodes) {
+ this.excludeNodes = excludeNodes;
+ }
+ /**
+ * @param excludeNodes Nodes names.
+ */
+ public TestCacheNodeExcludingFilter(String... excludeNodes) {
+ this.excludeNodes = Arrays.asList(excludeNodes);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean apply(ClusterNode clusterNode) {
+ String name = clusterNode.attribute(ATTR_IGNITE_INSTANCE_NAME).toString();
+
+ return !excludeNodes.contains(name);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/aeb9336b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsCacheRestoreTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsCacheRestoreTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsCacheRestoreTest.java
new file mode 100644
index 0000000..25626f4
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsCacheRestoreTest.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.db;
+
+import java.util.Arrays;
+import java.util.List;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.MemoryConfiguration;
+import org.apache.ignite.configuration.PersistentStoreConfiguration;
+import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ *
+ */
+public class IgnitePdsCacheRestoreTest extends GridCommonAbstractTest {
+ /** */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ private CacheConfiguration[] ccfgs;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
+
+ if (ccfgs != null) {
+ cfg.setCacheConfiguration(ccfgs);
+
+ ccfgs = null;
+ }
+
+ MemoryConfiguration memCfg = new MemoryConfiguration();
+ memCfg.setPageSize(1024);
+ memCfg.setDefaultMemoryPolicySize(10 * 1024 * 1024);
+
+ cfg.setMemoryConfiguration(memCfg);
+
+ PersistentStoreConfiguration pCfg = new PersistentStoreConfiguration();
+
+ pCfg.setWalMode(WALMode.LOG_ONLY);
+
+ cfg.setPersistentStoreConfiguration(pCfg);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ GridTestUtils.deleteDbFiles();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+
+ GridTestUtils.deleteDbFiles();
+
+ super.afterTest();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testRestoreAndNewCache1() throws Exception {
+ restoreAndNewCache(false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testRestoreAndNewCache2() throws Exception {
+ restoreAndNewCache(true);
+ }
+
+ /**
+ * @param createNew If {@code true} need cache is added while node is stopped.
+ * @throws Exception If failed.
+ */
+ private void restoreAndNewCache(boolean createNew) throws Exception {
+ for (int i = 0; i < 3; i++) {
+ ccfgs = configurations1();
+
+ startGrid(i);
+ }
+
+ ignite(0).active(true);
+
+ IgniteCache<Object, Object> cache1 = ignite(2).cache("c1");
+
+ List<Integer> keys = primaryKeys(cache1, 10);
+
+ for (Integer key : keys)
+ cache1.put(key, key);
+
+ stopGrid(2);
+
+ if (createNew) {
+ // New cache is added when node is stopped.
+ ignite(0).getOrCreateCaches(Arrays.asList(configurations2()));
+ }
+ else {
+ // New cache is added on node restart.
+ ccfgs = configurations2();
+ }
+
+ startGrid(2);
+
+ cache1 = ignite(2).cache("c1");
+
+ IgniteCache<Object, Object> cache2 = ignite(2).cache("c2");
+
+ for (Integer key : keys) {
+ assertEquals(key, cache1.get(key));
+
+ assertNull(cache2.get(key));
+
+ cache2.put(key, key);
+
+ assertEquals(key, cache2.get(key));
+ }
+
+ List<Integer> nearKeys = nearKeys(cache1, 10, 0);
+
+ for (Integer key : nearKeys) {
+ assertNull(cache1.get(key));
+ assertNull(cache2.get(key));
+
+ cache2.put(key, key);
+ assertEquals(key, cache2.get(key));
+
+ cache1.put(key, key);
+ assertEquals(key, cache1.get(key));
+ }
+
+ startGrid(3);
+
+ awaitPartitionMapExchange();
+
+ for (Integer key : nearKeys) {
+ assertEquals(key, cache2.get(key));
+
+ assertEquals(key, cache1.get(key));
+ }
+ }
+
+ /**
+ * @return Configurations set 1.
+ */
+ private CacheConfiguration[] configurations1() {
+ CacheConfiguration[] ccfgs = new CacheConfiguration[1];
+
+ ccfgs[0] = cacheConfiguration("c1");
+
+ return ccfgs;
+ }
+
+ /**
+ * @return Configurations set 1.
+ */
+ private CacheConfiguration[] configurations2() {
+ CacheConfiguration[] ccfgs = new CacheConfiguration[2];
+
+ ccfgs[0] = cacheConfiguration("c1");
+ ccfgs[1] = cacheConfiguration("c2");
+
+ return ccfgs;
+ }
+
+ /**
+ * @param name Cache name.
+ * @return Cache configuration.
+ */
+ private CacheConfiguration cacheConfiguration(String name) {
+ CacheConfiguration ccfg = new CacheConfiguration(name);
+
+ ccfg.setWriteSynchronizationMode(FULL_SYNC);
+
+ return ccfg;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/aeb9336b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
new file mode 100644
index 0000000..bb32d24
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.testsuites;
+
+import junit.framework.TestSuite;
+import org.apache.ignite.internal.processors.cache.distributed.CachePartitionStateTest;
+
+/**
+ * Test suite.
+ */
+public class IgniteCacheTestSuite6 extends TestSuite {
+ /**
+ * @return IgniteCache test suite.
+ * @throws Exception Thrown in case of the failure.
+ */
+ public static TestSuite suite() throws Exception {
+ TestSuite suite = new TestSuite("IgniteCache Test Suite part 6");
+
+ suite.addTestSuite(CachePartitionStateTest.class);
+
+ return suite;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/aeb9336b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java
index 5b562c3..5762c02 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java
@@ -22,6 +22,7 @@ import org.apache.ignite.internal.processors.cache.IgniteClusterActivateDeactiva
import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsClientNearCachePutGetTest;
import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsDynamicCacheTest;
import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsSingleNodePutGetPersistenceTest;
+import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsCacheRestoreTest;
import org.apache.ignite.internal.processors.cache.persistence.db.file.IgnitePdsCheckpointSimulationWithRealCpDisabledTest;
import org.apache.ignite.internal.processors.cache.persistence.db.file.IgnitePdsEvictionTest;
import org.apache.ignite.internal.processors.cache.persistence.pagemem.BPlusTreePageMemoryImplTest;
@@ -74,6 +75,8 @@ public class IgnitePdsTestSuite extends TestSuite {
suite.addTestSuite(IgniteClusterActivateDeactivateTestWithPersistence.class);
+ suite.addTestSuite(IgnitePdsCacheRestoreTest.class);
+
return suite;
}
}
[14/14] ignite git commit: Merge remote-tracking branch
'remotes/origin/master' into ignite-5569-debug
Posted by sb...@apache.org.
Merge remote-tracking branch 'remotes/origin/master' into ignite-5569-debug
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4a86a24c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4a86a24c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4a86a24c
Branch: refs/heads/ignite-5569-debug
Commit: 4a86a24cf26908f028ce07b02cd184c2ec547401
Parents: b8fee2a aeb9336
Author: sboikov <sb...@gridgain.com>
Authored: Mon Jul 24 11:57:52 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Jul 24 11:57:52 2017 +0300
----------------------------------------------------------------------
DEVNOTES.txt | 18 +-
.../cache/CacheAffinitySharedManager.java | 51 ++-
.../dht/GridClientPartitionTopology.java | 7 +-
.../dht/GridDhtPartitionTopology.java | 12 +-
.../dht/GridDhtPartitionTopologyImpl.java | 45 +-
.../dht/preloader/GridDhtPartitionMap.java | 7 +-
.../GridDhtPartitionsExchangeFuture.java | 120 +++--
.../GridCacheDatabaseSharedManager.java | 6 +-
.../wal/FileWriteAheadLogManager.java | 93 ++--
.../processors/job/GridJobProcessor.java | 10 +-
.../dotnet/PlatformDotNetCacheStore.java | 31 ++
.../communication/tcp/TcpCommunicationSpi.java | 77 +++-
.../internal/IgniteComputeJobOneThreadTest.java | 75 ++++
.../CacheLateAffinityAssignmentTest.java | 36 +-
.../distributed/CachePartitionStateTest.java | 410 +++++++++++++++++
.../TestCacheNodeExcludingFilter.java | 53 +++
.../dht/IgniteCacheMultiTxLockSelfTest.java | 2 -
.../db/IgnitePdsCacheRestoreTest.java | 208 +++++++++
.../testsuites/IgniteCacheTestSuite6.java | 38 ++
.../testsuites/IgniteComputeGridTestSuite.java | 3 +
.../ignite/testsuites/IgnitePdsTestSuite.java | 3 +
.../apache/ignite/ml/math/DistanceMeasure.java | 2 +-
.../ignite/ml/math/EuclideanDistance.java | 3 +-
.../math/decompositions/EigenDecomposition.java | 2 +-
.../apache/ignite/ml/math/impls/CacheUtils.java | 198 +++++++--
.../ml/math/impls/matrix/AbstractMatrix.java | 4 +-
.../ignite/ml/math/impls/matrix/BlockEntry.java | 50 +++
.../ml/math/impls/matrix/CacheMatrix.java | 9 +-
.../matrix/SparseBlockDistributedMatrix.java | 208 +++++++++
.../impls/matrix/SparseDistributedMatrix.java | 26 +-
.../storage/matrix/BaseBlockMatrixKey.java | 41 ++
.../impls/storage/matrix/BlockMatrixKey.java | 144 ++++++
.../storage/matrix/BlockMatrixStorage.java | 435 +++++++++++++++++++
.../vector/SparseLocalOnHeapVectorStorage.java | 4 +-
.../ignite/ml/math/statistics/Variance.java | 1 +
.../ignite/ml/math/statistics/package-info.java | 22 +
.../org/apache/ignite/ml/math/util/MapUtil.java | 2 +-
.../ignite/ml/math/util/package-info.java | 22 +
.../java/org/apache/ignite/ml/package-info.java | 22 +
.../ml/math/MathImplDistributedTestSuite.java | 2 +
.../SparseDistributedBlockMatrixTest.java | 379 ++++++++++++++++
.../matrix/SparseDistributedMatrixTest.java | 32 +-
.../Apache.Ignite.Core.Tests.csproj | 5 +
.../Cache/Store/CacheStoreSessionTest.cs | 90 ++--
.../Store/CacheStoreSessionTestCodeConfig.cs | 68 +++
.../Store/CacheStoreSessionTestSharedFactory.cs | 48 ++
.../Cache/Store/CacheStoreTest.cs | 10 +-
.../cache-store-session-shared-factory.xml | 76 ++++
.../Config/Cache/Store/cache-store-session.xml | 20 +-
.../Impl/Cache/Store/CacheStoreInternal.cs | 14 +-
modules/web-console/frontend/app/app.js | 14 +
.../frontend/app/data/pom-dependencies.json | 5 +-
.../configuration/generator/Maven.service.js | 9 +-
.../frontend/app/modules/states/errors.state.js | 6 +-
.../frontend/app/modules/states/signin.state.js | 10 +-
.../frontend/webpack/webpack.common.js | 4 +-
56 files changed, 2961 insertions(+), 331 deletions(-)
----------------------------------------------------------------------
[11/14] ignite git commit: IGNITE-5786 .NET: Fix cache store session
handling for cross-cache transactions
Posted by sb...@apache.org.
IGNITE-5786 .NET: Fix cache store session handling for cross-cache transactions
This closes #2331
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c1a3b374
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c1a3b374
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c1a3b374
Branch: refs/heads/ignite-5569-debug
Commit: c1a3b3744f89e27906621e62e9d73281791fcf30
Parents: 6de0571
Author: Pavel Tupitsyn <pt...@apache.org>
Authored: Fri Jul 21 17:04:39 2017 +0300
Committer: Pavel Tupitsyn <pt...@apache.org>
Committed: Fri Jul 21 17:04:39 2017 +0300
----------------------------------------------------------------------
.../dotnet/PlatformDotNetCacheStore.java | 31 +++++++
.../Apache.Ignite.Core.Tests.csproj | 5 ++
.../Cache/Store/CacheStoreSessionTest.cs | 90 +++++++++++++-------
.../Store/CacheStoreSessionTestCodeConfig.cs | 68 +++++++++++++++
.../Store/CacheStoreSessionTestSharedFactory.cs | 48 +++++++++++
.../Cache/Store/CacheStoreTest.cs | 10 ++-
.../cache-store-session-shared-factory.xml | 76 +++++++++++++++++
.../Config/Cache/Store/cache-store-session.xml | 20 ++---
.../Impl/Cache/Store/CacheStoreInternal.cs | 14 ++-
9 files changed, 320 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/c1a3b374/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetCacheStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetCacheStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetCacheStore.java
index dd61a54..471eb01 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetCacheStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetCacheStore.java
@@ -47,6 +47,7 @@ import javax.cache.integration.CacheLoaderException;
import javax.cache.integration.CacheWriterException;
import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
/**
@@ -90,6 +91,9 @@ public class PlatformDotNetCacheStore<K, V> implements CacheStore<K, V>, Platfor
/** Key used to distinguish session deployment. */
private static final Object KEY_SES = new Object();
+ /** Key to designate a set of stores that share current session. */
+ private static final Object KEY_SES_STORES = new Object();
+
/** */
@CacheStoreSessionResource
private CacheStoreSession ses;
@@ -337,6 +341,23 @@ public class PlatformDotNetCacheStore<K, V> implements CacheStore<K, V>, Platfor
writer.writeLong(session());
writer.writeString(ses.cacheName());
writer.writeBoolean(commit);
+
+ // When multiple stores (caches) participate in a single transaction,
+ // they share a single session, but sessionEnd is called on each store.
+ // Same thing happens on platform side: session is shared; each store must be notified,
+ // then session should be closed.
+ Collection<Long> stores = (Collection<Long>) ses.properties().get(KEY_SES_STORES);
+ assert stores != null;
+
+ stores.remove(ptr);
+ boolean last = stores.isEmpty();
+
+ writer.writeBoolean(last);
+
+ if (last) {
+ // Session object has been released on platform side, remove marker.
+ ses.properties().remove(KEY_SES);
+ }
}
}, null);
}
@@ -415,6 +436,16 @@ public class PlatformDotNetCacheStore<K, V> implements CacheStore<K, V>, Platfor
ses.properties().put(KEY_SES, sesPtr);
}
+ // Keep track of all stores that use current session (cross-cache tx uses single session for all caches).
+ Collection<Long> stores = (Collection<Long>) ses.properties().get(KEY_SES_STORES);
+
+ if (stores == null) {
+ stores = new HashSet<>();
+ ses.properties().put(KEY_SES_STORES, stores);
+ }
+
+ stores.add(ptr);
+
return sesPtr;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c1a3b374/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj
index 90b7970..e4f65bc 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj
@@ -89,6 +89,8 @@
<Compile Include="Cache\Query\Linq\CacheLinqTest.Misc.cs" />
<Compile Include="Cache\Query\Linq\CacheLinqTest.Custom.cs" />
<Compile Include="Cache\Query\Linq\CacheLinqTest.Contains.cs" />
+ <Compile Include="Cache\Store\CacheStoreSessionTestCodeConfig.cs" />
+ <Compile Include="Cache\Store\CacheStoreSessionTestSharedFactory.cs" />
<Compile Include="Deployment\CacheGetFunc.cs" />
<Compile Include="Deployment\GetAddressFunc.cs" />
<Compile Include="Deployment\PeerAssemblyLoadingAllApisTest.cs" />
@@ -314,6 +316,9 @@
<Content Include="Config\Cache\Affinity\affinity-function.xml">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</Content>
+ <Content Include="Config\Cache\Store\cache-store-session-shared-factory.xml">
+ <CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
+ </Content>
<Content Include="Config\Cache\Store\cache-store-session.xml">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</Content>
http://git-wip-us.apache.org/repos/asf/ignite/blob/c1a3b374/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreSessionTest.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreSessionTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreSessionTest.cs
index 315e285..818948c 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreSessionTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreSessionTest.cs
@@ -28,16 +28,13 @@ namespace Apache.Ignite.Core.Tests.Cache.Store
/// <summary>
/// Tests for store session.
/// </summary>
- public sealed class CacheStoreSessionTest
+ public class CacheStoreSessionTest
{
- /** Grid name. */
- private const string IgniteName = "grid";
-
/** Cache 1 name. */
- private const string Cache1 = "cache1";
+ protected const string Cache1 = "cache1";
/** Cache 2 name. */
- private const string Cache2 = "cache2";
+ protected const string Cache2 = "cache2";
/** Operations. */
private static ConcurrentBag<ICollection<Operation>> _dumps;
@@ -48,11 +45,26 @@ namespace Apache.Ignite.Core.Tests.Cache.Store
[TestFixtureSetUp]
public void BeforeTests()
{
- Ignition.Start(new IgniteConfiguration(TestUtils.GetTestConfiguration())
+ Ignition.Start(GetIgniteConfiguration());
+ }
+
+ /// <summary>
+ /// Gets the ignite configuration.
+ /// </summary>
+ protected virtual IgniteConfiguration GetIgniteConfiguration()
+ {
+ return new IgniteConfiguration(TestUtils.GetTestConfiguration())
{
- IgniteInstanceName = IgniteName,
SpringConfigUrl = @"config\cache\store\cache-store-session.xml"
- });
+ };
+ }
+
+ /// <summary>
+ /// Gets the store count.
+ /// </summary>
+ protected virtual int StoreCount
+ {
+ get { return 2; }
}
/// <summary>
@@ -61,21 +73,29 @@ namespace Apache.Ignite.Core.Tests.Cache.Store
[TestFixtureTearDown]
public void AfterTests()
{
- Ignition.StopAll(true);
+ try
+ {
+ TestUtils.AssertHandleRegistryHasItems(Ignition.GetIgnite(), 2, 1000);
+ }
+ finally
+ {
+ Ignition.StopAll(true);
+ }
}
/// <summary>
/// Test basic session API.
/// </summary>
[Test]
+ [Timeout(30000)]
public void TestSession()
{
_dumps = new ConcurrentBag<ICollection<Operation>>();
- var ignite = Ignition.GetIgnite(IgniteName);
+ var ignite = Ignition.GetIgnite();
- var cache1 = Ignition.GetIgnite(IgniteName).GetCache<int, int>(Cache1);
- var cache2 = Ignition.GetIgnite(IgniteName).GetCache<int, int>(Cache2);
+ var cache1 = ignite.GetCache<int, int>(Cache1);
+ var cache2 = ignite.GetCache<int, int>(Cache2);
// 1. Test rollback.
using (var tx = ignite.GetTransactions().TxStart())
@@ -86,11 +106,15 @@ namespace Apache.Ignite.Core.Tests.Cache.Store
tx.Rollback();
}
- Assert.AreEqual(1, _dumps.Count);
- var ops = _dumps.First();
- Assert.AreEqual(1, ops.Count);
+ // SessionEnd is called once per store instance.
+ Assert.AreEqual(StoreCount, _dumps.Count);
- Assert.AreEqual(1, ops.Count(op => op.Type == OperationType.SesEnd && !op.Commit));
+ foreach (var ops in _dumps)
+ {
+ var op = ops.Single();
+ Assert.AreEqual(OperationType.SesEnd, op.Type);
+ Assert.IsFalse(op.Commit);
+ }
_dumps = new ConcurrentBag<ICollection<Operation>>();
@@ -103,13 +127,17 @@ namespace Apache.Ignite.Core.Tests.Cache.Store
tx.Commit();
}
- Assert.AreEqual(1, _dumps.Count);
- ops = _dumps.First();
- Assert.AreEqual(3, ops.Count);
+ Assert.AreEqual(StoreCount, _dumps.Count);
- Assert.AreEqual(1, ops.Count(op => op.Type == OperationType.Write && Cache1.Equals(op.CacheName) && 1.Equals(op.Key) && 1.Equals(op.Value)));
- Assert.AreEqual(1, ops.Count(op => op.Type == OperationType.Write && Cache2.Equals(op.CacheName) && 2.Equals(op.Key) && 2.Equals(op.Value)));
- Assert.AreEqual(1, ops.Count(op => op.Type == OperationType.SesEnd && op.Commit));
+ foreach (var ops in _dumps)
+ {
+ Assert.AreEqual(2 + StoreCount, ops.Count);
+ Assert.AreEqual(1, ops.Count(op => op.Type == OperationType.Write
+ && Cache1 == op.CacheName && 1 == op.Key && 1 == op.Value));
+ Assert.AreEqual(1, ops.Count(op => op.Type == OperationType.Write
+ && Cache2 == op.CacheName && 2 == op.Key && 2 == op.Value));
+ Assert.AreEqual(StoreCount, ops.Count(op => op.Type == OperationType.SesEnd && op.Commit));
+ }
_dumps = new ConcurrentBag<ICollection<Operation>>();
@@ -122,13 +150,17 @@ namespace Apache.Ignite.Core.Tests.Cache.Store
tx.Commit();
}
- Assert.AreEqual(1, _dumps.Count);
- ops = _dumps.First();
- Assert.AreEqual(3, ops.Count);
+ Assert.AreEqual(StoreCount, _dumps.Count);
+ foreach (var ops in _dumps)
+ {
+ Assert.AreEqual(2 + StoreCount, ops.Count);
- Assert.AreEqual(1, ops.Count(op => op.Type == OperationType.Delete && Cache1.Equals(op.CacheName) && 1.Equals(op.Key)));
- Assert.AreEqual(1, ops.Count(op => op.Type == OperationType.Delete && Cache2.Equals(op.CacheName) && 2.Equals(op.Key)));
- Assert.AreEqual(1, ops.Count(op => op.Type == OperationType.SesEnd && op.Commit));
+ Assert.AreEqual(1, ops.Count(op => op.Type == OperationType.Delete
+ && Cache1 == op.CacheName && 1 == op.Key));
+ Assert.AreEqual(1, ops.Count(op => op.Type == OperationType.Delete
+ && Cache2 == op.CacheName && 2 == op.Key));
+ Assert.AreEqual(StoreCount, ops.Count(op => op.Type == OperationType.SesEnd && op.Commit));
+ }
}
/// <summary>
http://git-wip-us.apache.org/repos/asf/ignite/blob/c1a3b374/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreSessionTestCodeConfig.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreSessionTestCodeConfig.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreSessionTestCodeConfig.cs
new file mode 100644
index 0000000..0b5f474
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreSessionTestCodeConfig.cs
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Tests.Cache.Store
+{
+ using Apache.Ignite.Core.Cache.Configuration;
+ using Apache.Ignite.Core.Cache.Store;
+ using Apache.Ignite.Core.Common;
+ using NUnit.Framework;
+
+ /// <summary>
+ /// Tests store session with programmatic configuration (uses different store factory on Java side).
+ /// </summary>
+ [TestFixture]
+ public class CacheStoreSessionTestCodeConfig : CacheStoreSessionTest
+ {
+ /** <inheritdoc /> */
+ protected override IgniteConfiguration GetIgniteConfiguration()
+ {
+ return new IgniteConfiguration(TestUtils.GetTestConfiguration())
+ {
+ CacheConfiguration = new[]
+ {
+ new CacheConfiguration(Cache1)
+ {
+ AtomicityMode = CacheAtomicityMode.Transactional,
+ ReadThrough = true,
+ WriteThrough = true,
+ CacheStoreFactory = new StoreFactory()
+ },
+ new CacheConfiguration(Cache2)
+ {
+ AtomicityMode = CacheAtomicityMode.Transactional,
+ ReadThrough = true,
+ WriteThrough = true,
+ CacheStoreFactory = new StoreFactory()
+ }
+ }
+ };
+ }
+
+ /// <summary>
+ /// Store factory.
+ /// </summary>
+ private class StoreFactory : IFactory<ICacheStore>
+ {
+ /** <inheritdoc /> */
+ public ICacheStore CreateInstance()
+ {
+ return new Store();
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/c1a3b374/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreSessionTestSharedFactory.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreSessionTestSharedFactory.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreSessionTestSharedFactory.cs
new file mode 100644
index 0000000..2af5915
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreSessionTestSharedFactory.cs
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Tests.Cache.Store
+{
+ using NUnit.Framework;
+
+ /// <summary>
+ /// Session test with shared PlatformDotNetCacheStoreFactory,
+ /// which causes the same store insance to be used for both caches.
+ /// </summary>
+ [TestFixture]
+ public class CacheStoreSessionTestSharedFactory : CacheStoreSessionTest
+ {
+ /** <inheritdoc /> */
+ protected override IgniteConfiguration GetIgniteConfiguration()
+ {
+ return new IgniteConfiguration(TestUtils.GetTestConfiguration())
+ {
+ SpringConfigUrl = @"config\cache\store\cache-store-session-shared-factory.xml"
+ };
+ }
+
+ /** <inheritdoc /> */
+ protected override int StoreCount
+ {
+ get
+ {
+ // Shared PlatformDotNetCacheStoreFactory results in a single store instance.
+ return 1;
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/c1a3b374/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreTest.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreTest.cs
index e05f4bd..d3e4ab6 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreTest.cs
@@ -64,7 +64,15 @@ namespace Apache.Ignite.Core.Tests.Cache.Store
[TestFixtureTearDown]
public void AfterTests()
{
- Ignition.StopAll(true);
+ try
+ {
+ // 3 stores are expected in HandleRegistry.
+ TestUtils.AssertHandleRegistryHasItems(Ignition.GetIgnite(), 3, 1000);
+ }
+ finally
+ {
+ Ignition.StopAll(true);
+ }
}
/// <summary>
http://git-wip-us.apache.org/repos/asf/ignite/blob/c1a3b374/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/Cache/Store/cache-store-session-shared-factory.xml
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/Cache/Store/cache-store-session-shared-factory.xml b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/Cache/Store/cache-store-session-shared-factory.xml
new file mode 100644
index 0000000..05515c4
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/Cache/Store/cache-store-session-shared-factory.xml
@@ -0,0 +1,76 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:util="http://www.springframework.org/schema/util"
+ xsi:schemaLocation="
+ http://www.springframework.org/schema/beans
+ http://www.springframework.org/schema/beans/spring-beans.xsd
+ http://www.springframework.org/schema/util
+ http://www.springframework.org/schema/util/spring-util.xsd">
+ <bean id="storeFactory" class="org.apache.ignite.platform.dotnet.PlatformDotNetCacheStoreFactory">
+ <property name="typeName" value="Apache.Ignite.Core.Tests.Cache.Store.CacheStoreSessionTest+Store, Apache.Ignite.Core.Tests"/>
+ </bean>
+
+ <bean id="grid.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
+ <property name="localHost" value="127.0.0.1"/>
+ <property name="connectorConfiguration"><null/></property>
+
+ <property name="cacheConfiguration">
+ <list>
+ <bean class="org.apache.ignite.configuration.CacheConfiguration">
+ <property name="name" value="cache1"/>
+ <property name="cacheMode" value="LOCAL"/>
+ <property name="atomicityMode" value="TRANSACTIONAL"/>
+ <property name="writeThrough" value="true"/>
+ <property name="readThrough" value="true"/>
+
+ <property name="cacheStoreFactory" ref="storeFactory" />
+ </bean>
+
+ <bean class="org.apache.ignite.configuration.CacheConfiguration">
+ <property name="name" value="cache2"/>
+ <property name="cacheMode" value="LOCAL"/>
+ <property name="atomicityMode" value="TRANSACTIONAL"/>
+ <property name="writeThrough" value="true"/>
+ <property name="readThrough" value="true"/>
+
+ <property name="cacheStoreFactory" ref="storeFactory" />
+ </bean>
+ </list>
+ </property>
+
+ <property name="discoverySpi">
+ <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
+ <property name="ipFinder">
+ <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
+ <property name="addresses">
+ <list>
+ <!-- In distributed environment, replace with actual host IP address. -->
+ <value>127.0.0.1:47500</value>
+ </list>
+ </property>
+ </bean>
+ </property>
+ <property name="socketTimeout" value="300" />
+ </bean>
+ </property>
+ </bean>
+</beans>
http://git-wip-us.apache.org/repos/asf/ignite/blob/c1a3b374/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/Cache/Store/cache-store-session.xml
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/Cache/Store/cache-store-session.xml b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/Cache/Store/cache-store-session.xml
index 3cc9efa..14dc78e 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/Cache/Store/cache-store-session.xml
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/Cache/Store/cache-store-session.xml
@@ -25,18 +25,10 @@
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/util
http://www.springframework.org/schema/util/spring-util.xsd">
- <bean id="storeFactory" class="org.apache.ignite.platform.dotnet.PlatformDotNetCacheStoreFactory">
- <property name="typeName" value="Apache.Ignite.Core.Tests.Cache.Store.CacheStoreSessionTest+Store, Apache.Ignite.Core.Tests"/>
- </bean>
-
<bean id="grid.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
<property name="localHost" value="127.0.0.1"/>
<property name="connectorConfiguration"><null/></property>
- <property name="includeEventTypes">
- <util:constant static-field="org.apache.ignite.events.EventType.EVTS_CACHE"/>
- </property>
-
<property name="cacheConfiguration">
<list>
<bean class="org.apache.ignite.configuration.CacheConfiguration">
@@ -46,7 +38,11 @@
<property name="writeThrough" value="true"/>
<property name="readThrough" value="true"/>
- <property name="cacheStoreFactory" ref="storeFactory" />
+ <property name="cacheStoreFactory">
+ <bean class="org.apache.ignite.platform.dotnet.PlatformDotNetCacheStoreFactory">
+ <property name="typeName" value="Apache.Ignite.Core.Tests.Cache.Store.CacheStoreSessionTest+Store, Apache.Ignite.Core.Tests"/>
+ </bean>
+ </property>
</bean>
<bean class="org.apache.ignite.configuration.CacheConfiguration">
@@ -56,7 +52,11 @@
<property name="writeThrough" value="true"/>
<property name="readThrough" value="true"/>
- <property name="cacheStoreFactory" ref="storeFactory" />
+ <property name="cacheStoreFactory">
+ <bean class="org.apache.ignite.platform.dotnet.PlatformDotNetCacheStoreFactory">
+ <property name="typeName" value="Apache.Ignite.Core.Tests.Cache.Store.CacheStoreSessionTest+Store, Apache.Ignite.Core.Tests"/>
+ </bean>
+ </property>
</bean>
</list>
</property>
http://git-wip-us.apache.org/repos/asf/ignite/blob/c1a3b374/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Store/CacheStoreInternal.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Store/CacheStoreInternal.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Store/CacheStoreInternal.cs
index f147579..df4c1ae 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Store/CacheStoreInternal.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Store/CacheStoreInternal.cs
@@ -111,6 +111,8 @@ namespace Apache.Ignite.Core.Impl.Cache.Store
CacheStoreSession ses = grid.HandleRegistry.Get<CacheStoreSession>(sesId, true);
+ // Session cache name may change in cross-cache transaction.
+ // Single session is used for all stores in cross-cache transactions.
ses.CacheName = rawReader.ReadString();
_sesProxy.SetSession(ses);
@@ -223,11 +225,19 @@ namespace Apache.Ignite.Core.Impl.Cache.Store
break;
case OpSesEnd:
- grid.HandleRegistry.Release(sesId);
+ {
+ var commit = rawReader.ReadBoolean();
+ var last = rawReader.ReadBoolean();
- _store.SessionEnd(rawReader.ReadBoolean());
+ if (last)
+ {
+ grid.HandleRegistry.Release(sesId);
+ }
+
+ _store.SessionEnd(commit);
break;
+ }
default:
throw new IgniteException("Invalid operation type: " + opType);
[03/14] ignite git commit: IGNITE-5775: Fix bug with delay for
compute jobs. This closes #2319.
Posted by sb...@apache.org.
IGNITE-5775: Fix bug with delay for compute jobs. This closes #2319.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e285f9db
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e285f9db
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e285f9db
Branch: refs/heads/ignite-5569-debug
Commit: e285f9dbebc6fd86e81e52828813be9b9d2633f2
Parents: 02e2507
Author: Andrey V. Mashenkov <an...@gmail.com>
Authored: Thu Jul 20 13:24:25 2017 +0300
Committer: Andrey V. Mashenkov <an...@gmail.com>
Committed: Thu Jul 20 13:24:25 2017 +0300
----------------------------------------------------------------------
.../processors/job/GridJobProcessor.java | 10 +--
.../internal/IgniteComputeJobOneThreadTest.java | 75 ++++++++++++++++++++
.../testsuites/IgniteComputeGridTestSuite.java | 3 +
3 files changed, 83 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/e285f9db/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
index 7d2073e..cc8d903 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
@@ -1794,6 +1794,11 @@ public class GridJobProcessor extends GridProcessorAdapter {
return;
}
+ if (!activeJobs.remove(worker.getJobId(), worker))
+ cancelledJobs.remove(worker.getJobId(), worker);
+
+ heldJobs.remove(worker.getJobId());
+
try {
handleCollisions();
}
@@ -1801,11 +1806,6 @@ public class GridJobProcessor extends GridProcessorAdapter {
rwLock.readUnlock();
}
}
-
- if (!activeJobs.remove(worker.getJobId(), worker))
- cancelledJobs.remove(worker.getJobId(), worker);
-
- heldJobs.remove(worker.getJobId());
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e285f9db/modules/core/src/test/java/org/apache/ignite/internal/IgniteComputeJobOneThreadTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteComputeJobOneThreadTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteComputeJobOneThreadTest.java
new file mode 100644
index 0000000..76f669e
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteComputeJobOneThreadTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.lang.IgniteRunnable;
+import org.apache.ignite.spi.collision.fifoqueue.FifoQueueCollisionSpi;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ * Test of absence of gaps between jobs in compute
+ */
+public class IgniteComputeJobOneThreadTest extends GridCommonAbstractTest {
+ @Override protected IgniteConfiguration getConfiguration(String name) throws Exception {
+ FifoQueueCollisionSpi colSpi = new FifoQueueCollisionSpi();
+ colSpi.setParallelJobsNumber(1);
+
+ return super.getConfiguration(name)
+ .setMetricsUpdateFrequency(10000)
+ .setCollisionSpi(colSpi);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ startGrid(0);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected long getTestTimeout() {
+ return 10000;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testNoTimeout() throws Exception {
+ Ignite ignite = ignite(0);
+
+ IgniteFuture fut = null;
+
+ for (int i = 0; i < 10000; i++) {
+ fut = ignite.compute().runAsync(new IgniteRunnable() {
+ @Override public void run() {
+
+ }
+ });
+ }
+
+ fut.get();
+
+ assertTrue(true);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e285f9db/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java
index 3f3bc53..ac3de73 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java
@@ -65,6 +65,7 @@ import org.apache.ignite.internal.GridTaskNameAnnotationSelfTest;
import org.apache.ignite.internal.GridTaskResultCacheSelfTest;
import org.apache.ignite.internal.GridTaskTimeoutSelfTest;
import org.apache.ignite.internal.IgniteComputeEmptyClusterGroupTest;
+import org.apache.ignite.internal.IgniteComputeJobOneThreadTest;
import org.apache.ignite.internal.IgniteComputeTopologyExceptionTest;
import org.apache.ignite.internal.IgniteExecutorServiceTest;
import org.apache.ignite.internal.IgniteExplicitImplicitDeploymentSelfTest;
@@ -163,6 +164,8 @@ public class IgniteComputeGridTestSuite {
suite.addTestSuite(IgniteComputeCustomExecutorConfigurationSelfTest.class);
suite.addTestSuite(IgniteComputeCustomExecutorSelfTest.class);
+ suite.addTestSuite(IgniteComputeJobOneThreadTest.class);
+
return suite;
}
}
[02/14] ignite git commit: IGNITE-5788 Web Console: Fixed
dependencies for maven project with c3p0.
Posted by sb...@apache.org.
IGNITE-5788 Web Console: Fixed dependencies for maven project with c3p0.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/02a1bdca
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/02a1bdca
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/02a1bdca
Branch: refs/heads/ignite-5569-debug
Commit: 02a1bdca57ce6af7fe7636b0a9f99048c89b88b6
Parents: 70d0f99
Author: Andrey Novikov <an...@gridgain.com>
Authored: Thu Jul 20 15:47:49 2017 +0700
Committer: Andrey Novikov <an...@gridgain.com>
Committed: Thu Jul 20 15:47:49 2017 +0700
----------------------------------------------------------------------
modules/web-console/frontend/app/data/pom-dependencies.json | 5 ++++-
.../app/modules/configuration/generator/Maven.service.js | 9 +++------
modules/web-console/frontend/webpack/webpack.common.js | 4 ++--
3 files changed, 9 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/02a1bdca/modules/web-console/frontend/app/data/pom-dependencies.json
----------------------------------------------------------------------
diff --git a/modules/web-console/frontend/app/data/pom-dependencies.json b/modules/web-console/frontend/app/data/pom-dependencies.json
index 945e3f5..8d3fa81 100644
--- a/modules/web-console/frontend/app/data/pom-dependencies.json
+++ b/modules/web-console/frontend/app/data/pom-dependencies.json
@@ -11,7 +11,10 @@
"HadoopIgfsJcl": {"artifactId": "ignite-hadoop"},
"SLF4J": {"artifactId": "ignite-slf4j"},
- "Generic": {"groupId": "com.mchange", "artifactId": "c3p0", "version": "0.9.5.2"},
+ "Generic": [
+ {"groupId": "com.mchange", "artifactId": "c3p0", "version": "0.9.5.2"},
+ {"groupId": "com.mchange", "artifactId": "mchange-commons-java", "version": "0.2.11"}
+ ],
"MySQL": {"groupId": "mysql", "artifactId": "mysql-connector-java", "version": "5.1.40"},
"PostgreSQL": {"groupId": "org.postgresql", "artifactId": "postgresql", "version": "9.4.1212.jre7"},
"H2": {"groupId": "com.h2database", "artifactId": "h2", "version": [
http://git-wip-us.apache.org/repos/asf/ignite/blob/02a1bdca/modules/web-console/frontend/app/modules/configuration/generator/Maven.service.js
----------------------------------------------------------------------
diff --git a/modules/web-console/frontend/app/modules/configuration/generator/Maven.service.js b/modules/web-console/frontend/app/modules/configuration/generator/Maven.service.js
index 81d7d10..abbada9 100644
--- a/modules/web-console/frontend/app/modules/configuration/generator/Maven.service.js
+++ b/modules/web-console/frontend/app/modules/configuration/generator/Maven.service.js
@@ -47,12 +47,9 @@ export default class IgniteMavenGenerator {
return _.isArray(version) ? _.find(version, (v) => versionService.since(igniteVer, v.range)).version : version;
};
- if (!_.has(POM_DEPENDENCIES, key))
- return;
-
- const {groupId, artifactId, version, jar} = POM_DEPENDENCIES[key];
-
- this.addDependency(deps, groupId || 'org.apache.ignite', artifactId, extractVersion(version) || dfltVer, jar);
+ _.forEach(POM_DEPENDENCIES[key], ({groupId, artifactId, version, jar}) => {
+ this.addDependency(deps, groupId || 'org.apache.ignite', artifactId, extractVersion(version) || dfltVer, jar);
+ });
}
addResource(sb, dir, exclude) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/02a1bdca/modules/web-console/frontend/webpack/webpack.common.js
----------------------------------------------------------------------
diff --git a/modules/web-console/frontend/webpack/webpack.common.js b/modules/web-console/frontend/webpack/webpack.common.js
index a303d6e..5a3763e 100644
--- a/modules/web-console/frontend/webpack/webpack.common.js
+++ b/modules/web-console/frontend/webpack/webpack.common.js
@@ -138,7 +138,7 @@ export default {
},
{
test: /\.(jpe?g|png|gif)$/i,
- loader: 'file?name=assets/images/[name]_[hash].[ext]'
+ loader: 'file?name=assets/images/[name].[hash].[ext]'
},
{
test: require.resolve('jquery'),
@@ -178,7 +178,7 @@ export default {
new HtmlWebpackPlugin({
template: './views/index.pug'
}),
- new ExtractTextPlugin({filename: 'assets/css/[name].css', allChunks: true}),
+ new ExtractTextPlugin({filename: 'assets/css/[name].[hash].css', allChunks: true}),
new CopyWebpackPlugin([
{ context: 'public', from: '**/*.png' },
{ context: 'public', from: '**/*.svg' },
[04/14] ignite git commit: IGNITE-5776: Add option to turn on filter
reachable addresses in TcpCommunicationSpi. This closes #2317.
Posted by sb...@apache.org.
IGNITE-5776: Add option to turn on filter reachable addresses in TcpCommunicationSpi. This closes #2317.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/bd7a08e3
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/bd7a08e3
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/bd7a08e3
Branch: refs/heads/ignite-5569-debug
Commit: bd7a08e31d03b2c51b225cf388dc1197348a1593
Parents: e285f9d
Author: Evgenii Zhuravlev <ez...@gridgain.com>
Authored: Thu Jul 20 13:32:18 2017 +0300
Committer: Andrey V. Mashenkov <an...@gmail.com>
Committed: Thu Jul 20 13:32:18 2017 +0300
----------------------------------------------------------------------
.../communication/tcp/TcpCommunicationSpi.java | 77 ++++++++++++++------
1 file changed, 56 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/bd7a08e3/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 35d3032..5b952e8 100755
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -311,6 +311,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
/** Default value for {@code TCP_NODELAY} socket option (value is <tt>true</tt>). */
public static final boolean DFLT_TCP_NODELAY = true;
+ /** Default value for {@code FILTER_REACHABLE_ADDRESSES} socket option (value is <tt>false</tt>). */
+ public static final boolean DFLT_FILTER_REACHABLE_ADDRESSES = false;
+
/** Default received messages threshold for sending ack. */
public static final int DFLT_ACK_SND_THRESHOLD = 32;
@@ -1016,6 +1019,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
/** {@code TCP_NODELAY} option value for created sockets. */
private boolean tcpNoDelay = DFLT_TCP_NODELAY;
+ /** {@code FILTER_REACHABLE_ADDRESSES} option value for created sockets. */
+ private boolean filterReachableAddresses = DFLT_FILTER_REACHABLE_ADDRESSES;
+
/** Number of received messages after which acknowledgment is sent. */
private int ackSndThreshold = DFLT_ACK_SND_THRESHOLD;
@@ -1626,6 +1632,33 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
}
/**
+ * Gets value for {@code FILTER_REACHABLE_ADDRESSES} socket option.
+ *
+ * @return {@code True} if needed to filter reachable addresses.
+ */
+ public boolean isFilterReachableAddresses() {
+ return filterReachableAddresses;
+ }
+
+ /**
+ * Setting this option to {@code true} enables filter for reachable
+ * addresses on creating tcp client.
+ * <p>
+ * Usually its advised to set this value to {@code false}.
+ * <p>
+ * If not provided, default value is {@link #DFLT_FILTER_REACHABLE_ADDRESSES}.
+ *
+ * @param filterReachableAddresses {@code True} to filter reachable addresses.
+ * @return {@code this} for chaining.
+ */
+ @IgniteSpiConfiguration(optional = true)
+ public TcpCommunicationSpi setFilterReachableAddresses(boolean filterReachableAddresses) {
+ this.filterReachableAddresses = filterReachableAddresses;
+
+ return this;
+ }
+
+ /**
* Sets receive buffer size for sockets created or accepted by this SPI.
* <p>
* If not provided, default is {@link #DFLT_SOCK_BUF_SIZE}.
@@ -2952,35 +2985,37 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
if (isExtAddrsExist)
addrs.addAll(extAddrs);
- Set<InetAddress> allInetAddrs = U.newHashSet(addrs.size());
+ if (filterReachableAddresses) {
+ Set<InetAddress> allInetAddrs = U.newHashSet(addrs.size());
- for (InetSocketAddress addr : addrs) {
- // Skip unresolved as addr.getAddress() can return null.
- if(!addr.isUnresolved())
- allInetAddrs.add(addr.getAddress());
- }
+ for (InetSocketAddress addr : addrs) {
+ // Skip unresolved as addr.getAddress() can return null.
+ if (!addr.isUnresolved())
+ allInetAddrs.add(addr.getAddress());
+ }
- List<InetAddress> reachableInetAddrs = U.filterReachable(allInetAddrs);
+ List<InetAddress> reachableInetAddrs = U.filterReachable(allInetAddrs);
- if (reachableInetAddrs.size() < allInetAddrs.size()) {
- LinkedHashSet<InetSocketAddress> addrs0 = U.newLinkedHashSet(addrs.size());
+ if (reachableInetAddrs.size() < allInetAddrs.size()) {
+ LinkedHashSet<InetSocketAddress> addrs0 = U.newLinkedHashSet(addrs.size());
- List<InetSocketAddress> unreachableInetAddr = new ArrayList<>(allInetAddrs.size() - reachableInetAddrs.size());
+ List<InetSocketAddress> unreachableInetAddr = new ArrayList<>(allInetAddrs.size() - reachableInetAddrs.size());
- for (InetSocketAddress addr : addrs) {
- if (reachableInetAddrs.contains(addr.getAddress()))
- addrs0.add(addr);
- else
- unreachableInetAddr.add(addr);
- }
+ for (InetSocketAddress addr : addrs) {
+ if (reachableInetAddrs.contains(addr.getAddress()))
+ addrs0.add(addr);
+ else
+ unreachableInetAddr.add(addr);
+ }
- addrs0.addAll(unreachableInetAddr);
+ addrs0.addAll(unreachableInetAddr);
- addrs = addrs0;
- }
+ addrs = addrs0;
+ }
- if (log.isDebugEnabled())
- log.debug("Addresses to connect for node [rmtNode=" + node.id() + ", addrs=" + addrs.toString() + ']');
+ if (log.isDebugEnabled())
+ log.debug("Addresses to connect for node [rmtNode=" + node.id() + ", addrs=" + addrs.toString() + ']');
+ }
boolean conn = false;
GridCommunicationClient client = null;
[12/14] ignite git commit: Merge branch 'ignite-2.1.3'
Posted by sb...@apache.org.
Merge branch 'ignite-2.1.3'
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ca496f6e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ca496f6e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ca496f6e
Branch: refs/heads/ignite-5569-debug
Commit: ca496f6e9e48a107abb72a4fea87b5b11da66806
Parents: 0adaf6e c1a3b37
Author: Pavel Tupitsyn <pt...@apache.org>
Authored: Fri Jul 21 17:05:10 2017 +0300
Committer: Pavel Tupitsyn <pt...@apache.org>
Committed: Fri Jul 21 17:05:10 2017 +0300
----------------------------------------------------------------------
.../dht/preloader/GridDhtPartitionMap.java | 7 +-
.../wal/FileWriteAheadLogManager.java | 93 +++++++++++++-------
.../dotnet/PlatformDotNetCacheStore.java | 31 +++++++
.../Apache.Ignite.Core.Tests.csproj | 5 ++
.../Cache/Store/CacheStoreSessionTest.cs | 90 +++++++++++++------
.../Store/CacheStoreSessionTestCodeConfig.cs | 68 ++++++++++++++
.../Store/CacheStoreSessionTestSharedFactory.cs | 48 ++++++++++
.../Cache/Store/CacheStoreTest.cs | 10 ++-
.../cache-store-session-shared-factory.xml | 76 ++++++++++++++++
.../Config/Cache/Store/cache-store-session.xml | 20 ++---
.../Impl/Cache/Store/CacheStoreInternal.cs | 14 ++-
modules/web-console/frontend/app/app.js | 14 +++
.../frontend/app/data/pom-dependencies.json | 5 +-
.../configuration/generator/Maven.service.js | 9 +-
.../frontend/app/modules/states/errors.state.js | 6 +-
.../frontend/app/modules/states/signin.state.js | 10 ++-
.../frontend/webpack/webpack.common.js | 4 +-
17 files changed, 422 insertions(+), 88 deletions(-)
----------------------------------------------------------------------
[07/14] ignite git commit: IGNITE-3950 Deadlock when exchange starts
with pending explicit lock
Posted by sb...@apache.org.
IGNITE-3950 Deadlock when exchange starts with pending explicit lock
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/48f29943
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/48f29943
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/48f29943
Branch: refs/heads/ignite-5569-debug
Commit: 48f29943efa9cbfc1e2c4068f7e16373dec2b0b9
Parents: db43b0c
Author: Vitaliy Biryukov <Bi...@gmail.com>
Authored: Fri Jul 21 15:29:23 2017 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Fri Jul 21 15:29:23 2017 +0300
----------------------------------------------------------------------
.../cache/distributed/dht/IgniteCacheMultiTxLockSelfTest.java | 2 --
1 file changed, 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/48f29943/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheMultiTxLockSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheMultiTxLockSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheMultiTxLockSelfTest.java
index 6fd5dd3..11b0eea 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheMultiTxLockSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheMultiTxLockSelfTest.java
@@ -117,8 +117,6 @@ public class IgniteCacheMultiTxLockSelfTest extends GridCommonAbstractTest {
* @throws Exception If failed.
*/
public void testExplicitLockManyKeysWithClient() throws Exception {
- fail("https://issues.apache.org/jira/browse/IGNITE-3950");
-
checkExplicitLock(4, true);
}
[09/14] ignite git commit: IGNITE-5752 Fixed updateSequence updating
in GridDhtPartitionMap. - Fixes #2297.
Posted by sb...@apache.org.
IGNITE-5752 Fixed updateSequence updating in GridDhtPartitionMap. - Fixes #2297.
Signed-off-by: Alexey Goncharuk <al...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/199b9543
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/199b9543
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/199b9543
Branch: refs/heads/ignite-5569-debug
Commit: 199b954345f179851718acd131188506668cd4f3
Parents: 02a1bdc
Author: Pavel Kovalenko <jo...@gmail.com>
Authored: Fri Jul 21 16:29:15 2017 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Fri Jul 21 16:29:15 2017 +0300
----------------------------------------------------------------------
.../cache/distributed/dht/preloader/GridDhtPartitionMap.java | 7 +++++--
1 file changed, 5 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/199b9543/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap.java
index cfd4400..735ca1e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap.java
@@ -24,7 +24,6 @@ import java.io.ObjectOutput;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
-
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
@@ -202,9 +201,13 @@ public class GridDhtPartitionMap implements Comparable<GridDhtPartitionMap>, Ext
* @return Old update sequence value.
*/
public long updateSequence(long updateSeq, AffinityTopologyVersion topVer) {
+ assert topVer.compareTo(top) >= 0 : "Invalid topology version [cur=" + top + ", new=" + topVer + "]";
+
long old = this.updateSeq;
- assert updateSeq >= old : "Invalid update sequence [cur=" + old + ", new=" + updateSeq + ']';
+ // Overwrite update sequence without checking in case of greater topology version
+ if (topVer.compareTo(top) == 0)
+ assert updateSeq >= old : "Invalid update sequence [cur=" + old + ", new=" + updateSeq + ']';
this.updateSeq = updateSeq;
[06/14] ignite git commit: IGNITE-5791 Block matrix introduction
Posted by sb...@apache.org.
IGNITE-5791 Block matrix introduction
This closes #2326
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0d2b989d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0d2b989d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0d2b989d
Branch: refs/heads/ignite-5569-debug
Commit: 0d2b989d2be62533a36061940497a734463b5f10
Parents: db43b0c
Author: Yury Babak <yb...@gridgain.com>
Authored: Fri Jul 21 15:28:21 2017 +0300
Committer: Pavel Tupitsyn <pt...@apache.org>
Committed: Fri Jul 21 15:28:21 2017 +0300
----------------------------------------------------------------------
.../apache/ignite/ml/math/DistanceMeasure.java | 2 +-
.../ignite/ml/math/EuclideanDistance.java | 3 +-
.../math/decompositions/EigenDecomposition.java | 2 +-
.../apache/ignite/ml/math/impls/CacheUtils.java | 198 +++++++--
.../ml/math/impls/matrix/AbstractMatrix.java | 4 +-
.../ignite/ml/math/impls/matrix/BlockEntry.java | 50 +++
.../ml/math/impls/matrix/CacheMatrix.java | 9 +-
.../matrix/SparseBlockDistributedMatrix.java | 208 +++++++++
.../impls/matrix/SparseDistributedMatrix.java | 26 +-
.../storage/matrix/BaseBlockMatrixKey.java | 41 ++
.../impls/storage/matrix/BlockMatrixKey.java | 144 ++++++
.../storage/matrix/BlockMatrixStorage.java | 435 +++++++++++++++++++
.../vector/SparseLocalOnHeapVectorStorage.java | 4 +-
.../ignite/ml/math/statistics/Variance.java | 1 +
.../ignite/ml/math/statistics/package-info.java | 22 +
.../org/apache/ignite/ml/math/util/MapUtil.java | 2 +-
.../ignite/ml/math/util/package-info.java | 22 +
.../java/org/apache/ignite/ml/package-info.java | 22 +
.../ml/math/MathImplDistributedTestSuite.java | 2 +
.../SparseDistributedBlockMatrixTest.java | 379 ++++++++++++++++
.../matrix/SparseDistributedMatrixTest.java | 32 +-
21 files changed, 1528 insertions(+), 80 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/0d2b989d/modules/ml/src/main/java/org/apache/ignite/ml/math/DistanceMeasure.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/DistanceMeasure.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/DistanceMeasure.java
index 09be0c3..df235a7 100644
--- a/modules/ml/src/main/java/org/apache/ignite/ml/math/DistanceMeasure.java
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/DistanceMeasure.java
@@ -34,5 +34,5 @@ public interface DistanceMeasure extends Externalizable {
* @return the distance between the two vectors
* @throws CardinalityException if the array lengths differ.
*/
- double compute(Vector a, Vector b) throws CardinalityException;
+ public double compute(Vector a, Vector b) throws CardinalityException;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/0d2b989d/modules/ml/src/main/java/org/apache/ignite/ml/math/EuclideanDistance.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/EuclideanDistance.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/EuclideanDistance.java
index 5f962ce..edc11dc 100644
--- a/modules/ml/src/main/java/org/apache/ignite/ml/math/EuclideanDistance.java
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/EuclideanDistance.java
@@ -30,8 +30,7 @@ public class EuclideanDistance implements DistanceMeasure {
private static final long serialVersionUID = 1717556319784040040L;
/** {@inheritDoc} */
- @Override
- public double compute(Vector a, Vector b)
+ @Override public double compute(Vector a, Vector b)
throws CardinalityException {
return MatrixUtil.localCopyOf(a).minus(b).kNorm(2.0);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/0d2b989d/modules/ml/src/main/java/org/apache/ignite/ml/math/decompositions/EigenDecomposition.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/decompositions/EigenDecomposition.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/decompositions/EigenDecomposition.java
index d0e91a5..a5c92e6 100644
--- a/modules/ml/src/main/java/org/apache/ignite/ml/math/decompositions/EigenDecomposition.java
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/decompositions/EigenDecomposition.java
@@ -446,7 +446,7 @@ public class EigenDecomposition implements Destroyable {
// Store roots isolated by balanc and compute matrix norm
- double norm = h.foldMap(Functions.PLUS, Functions.ABS, 0.0);
+ double norm = h.foldMap(Functions.PLUS, Functions.ABS, 0.0d);
// Outer loop over eigenvalue index
http://git-wip-us.apache.org/repos/asf/ignite/blob/0d2b989d/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/CacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/CacheUtils.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/CacheUtils.java
index 1bda5e6..369840b 100644
--- a/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/CacheUtils.java
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/CacheUtils.java
@@ -39,11 +39,16 @@ import org.apache.ignite.ml.math.KeyMapper;
import org.apache.ignite.ml.math.ValueMapper;
import org.apache.ignite.ml.math.functions.IgniteBiFunction;
import org.apache.ignite.ml.math.functions.IgniteConsumer;
+import org.apache.ignite.ml.math.functions.IgniteDoubleFunction;
import org.apache.ignite.ml.math.functions.IgniteFunction;
-import org.apache.ignite.ml.math.impls.storage.matrix.SparseDistributedMatrixStorage;
+import org.apache.ignite.ml.math.impls.matrix.BlockEntry;
+import org.apache.ignite.ml.math.impls.storage.matrix.BlockMatrixKey;
+import org.apache.ignite.internal.util.typedef.internal.A;
/**
* Distribution-related misc. support.
+ *
+ * TODO: IGNITE-5102, fix sparse key filters
*/
public class CacheUtils {
/**
@@ -127,19 +132,38 @@ public class CacheUtils {
* @param matrixUuid Matrix UUID.
* @return Sum obtained using sparse logic.
*/
- public static <K, V> double sparseSum(IgniteUuid matrixUuid) {
- Collection<Double> subSums = fold(SparseDistributedMatrixStorage.ML_CACHE_NAME, (CacheEntry<IgniteBiTuple<Integer, IgniteUuid>, Map<Integer, Double>> ce, Double acc) -> {
- Cache.Entry<IgniteBiTuple<Integer, IgniteUuid>, Map<Integer, Double>> entry = ce.entry();
- if (entry.getKey().get2().equals(matrixUuid)) {
- Map<Integer, Double> map = entry.getValue();
+ @SuppressWarnings("unchecked")
+ public static <K, V> double sparseSum(IgniteUuid matrixUuid, String cacheName) {
+ A.notNull(matrixUuid, "matrixUuid");
+ A.notNull(cacheName, "cacheName");
+
+ Collection<Double> subSums = fold(cacheName, (CacheEntry<K, V> ce, Double acc) -> {
+ V v = ce.entry().getValue();
+
+ double sum = 0.0;
- double sum = sum(map.values());
+ if (v instanceof Map) {
+ Map<Integer, Double> map = (Map<Integer, Double>)v;
- return acc == null ? sum : acc + sum;
+ sum = sum(map.values());
+ }
+ else if (v instanceof BlockEntry) {
+ BlockEntry be = (BlockEntry)v;
+
+ sum = be.sum();
}
else
- return acc;
- }, key -> key.get2().equals(matrixUuid));
+ throw new UnsupportedOperationException();
+
+ return acc == null ? sum : acc + sum;
+ }, key -> {
+ if (key instanceof BlockMatrixKey)
+ return ((BlockMatrixKey)key).matrixId().equals(matrixUuid);
+ else if (key instanceof IgniteBiTuple)
+ return ((IgniteBiTuple<Integer, IgniteUuid>)key).get2().equals(matrixUuid);
+ else
+ throw new UnsupportedOperationException();
+ });
return sum(subSums);
}
@@ -186,23 +210,42 @@ public class CacheUtils {
* @param matrixUuid Matrix UUID.
* @return Minimum value obtained using sparse logic.
*/
- public static <K, V> double sparseMin(IgniteUuid matrixUuid) {
- Collection<Double> mins = fold(SparseDistributedMatrixStorage.ML_CACHE_NAME, (CacheEntry<IgniteBiTuple<Integer, IgniteUuid>, Map<Integer, Double>> ce, Double acc) -> {
- Cache.Entry<IgniteBiTuple<Integer, IgniteUuid>, Map<Integer, Double>> entry = ce.entry();
+ @SuppressWarnings("unchecked")
+ public static <K, V> double sparseMin(IgniteUuid matrixUuid, String cacheName) {
+ A.notNull(matrixUuid, "matrixUuid");
+ A.notNull(cacheName, "cacheName");
- if (entry.getKey().get2().equals(matrixUuid)) {
- Map<Integer, Double> map = entry.getValue();
+ Collection<Double> mins = fold(cacheName, (CacheEntry<K, V> ce, Double acc) -> {
+ V v = ce.entry().getValue();
- double min = Collections.min(map.values());
+ double min;
- if (acc == null)
- return min;
- else
- return Math.min(acc, min);
+ if (v instanceof Map) {
+ Map<Integer, Double> map = (Map<Integer, Double>)v;
+
+ min = Collections.min(map.values());
+ }
+ else if (v instanceof BlockEntry) {
+ BlockEntry be = (BlockEntry)v;
+
+ min = be.minValue();
}
else
- return acc;
- }, key -> key.get2().equals(matrixUuid));
+ throw new UnsupportedOperationException();
+
+ if (acc == null)
+ return min;
+ else
+ return Math.min(acc, min);
+
+ }, key -> {
+ if (key instanceof BlockMatrixKey)
+ return ((BlockMatrixKey)key).matrixId().equals(matrixUuid);
+ else if (key instanceof IgniteBiTuple)
+ return ((IgniteBiTuple<Integer, IgniteUuid>)key).get2().equals(matrixUuid);
+ else
+ throw new UnsupportedOperationException();
+ });
return Collections.min(mins);
}
@@ -211,22 +254,42 @@ public class CacheUtils {
* @param matrixUuid Matrix UUID.
* @return Maximum value obtained using sparse logic.
*/
- public static <K, V> double sparseMax(IgniteUuid matrixUuid) {
- Collection<Double> maxes = fold(SparseDistributedMatrixStorage.ML_CACHE_NAME, (CacheEntry<IgniteBiTuple<Integer, IgniteUuid>, Map<Integer, Double>> ce, Double acc) -> {
- Cache.Entry<IgniteBiTuple<Integer, IgniteUuid>, Map<Integer, Double>> entry = ce.entry();
- if (entry.getKey().get2().equals(matrixUuid)) {
- Map<Integer, Double> map = entry.getValue();
+ @SuppressWarnings("unchecked")
+ public static <K, V> double sparseMax(IgniteUuid matrixUuid, String cacheName) {
+ A.notNull(matrixUuid, "matrixUuid");
+ A.notNull(cacheName, "cacheName");
+
+ Collection<Double> maxes = fold(cacheName, (CacheEntry<K, V> ce, Double acc) -> {
+ V v = ce.entry().getValue();
- double max = Collections.max(map.values());
+ double max;
- if (acc == null)
- return max;
- else
- return Math.max(acc, max);
+ if (v instanceof Map) {
+ Map<Integer, Double> map = (Map<Integer, Double>)v;
+
+ max = Collections.max(map.values());
+ }
+ else if (v instanceof BlockEntry) {
+ BlockEntry be = (BlockEntry)v;
+
+ max = be.maxValue();
}
else
- return acc;
- }, key -> key.get2().equals(matrixUuid));
+ throw new UnsupportedOperationException();
+
+ if (acc == null)
+ return max;
+ else
+ return Math.max(acc, max);
+
+ }, key -> {
+ if (key instanceof BlockMatrixKey)
+ return ((BlockMatrixKey)key).matrixId().equals(matrixUuid);
+ else if (key instanceof IgniteBiTuple)
+ return ((IgniteBiTuple<Integer, IgniteUuid>)key).get2().equals(matrixUuid);
+ else
+ throw new UnsupportedOperationException();
+ });
return Collections.max(maxes);
}
@@ -279,17 +342,41 @@ public class CacheUtils {
* @param matrixUuid Matrix UUID.
* @param mapper Mapping {@link IgniteFunction}.
*/
- public static <K, V> void sparseMap(IgniteUuid matrixUuid, IgniteFunction<Double, Double> mapper) {
- foreach(SparseDistributedMatrixStorage.ML_CACHE_NAME, (CacheEntry<IgniteBiTuple<Integer, IgniteUuid>, Map<Integer, Double>> ce) -> {
- IgniteBiTuple k = ce.entry().getKey();
+ @SuppressWarnings("unchecked")
+ public static <K, V> void sparseMap(IgniteUuid matrixUuid, IgniteDoubleFunction<Double> mapper, String cacheName) {
+ A.notNull(matrixUuid, "matrixUuid");
+ A.notNull(cacheName, "cacheName");
+ A.notNull(mapper, "mapper");
+
+ foreach(cacheName, (CacheEntry<K, V> ce) -> {
+ K k = ce.entry().getKey();
+
+ V v = ce.entry().getValue();
- Map<Integer, Double> v = ce.entry().getValue();
+ if (v instanceof Map) {
+ Map<Integer, Double> map = (Map<Integer, Double>)v;
- for (Map.Entry<Integer, Double> e : v.entrySet())
- e.setValue(mapper.apply(e.getValue()));
+ for (Map.Entry<Integer, Double> e : (map.entrySet()))
+ e.setValue(mapper.apply(e.getValue()));
+
+ }
+ else if (v instanceof BlockEntry) {
+ BlockEntry be = (BlockEntry)v;
+
+ be.map(mapper);
+ }
+ else
+ throw new UnsupportedOperationException();
ce.cache().put(k, v);
- }, key -> key.get2().equals(matrixUuid));
+ }, key -> {
+ if (key instanceof BlockMatrixKey)
+ return ((BlockMatrixKey)key).matrixId().equals(matrixUuid);
+ else if (key instanceof IgniteBiTuple)
+ return ((IgniteBiTuple<Integer, IgniteUuid>)key).get2().equals(matrixUuid);
+ else
+ throw new UnsupportedOperationException();
+ });
}
/**
@@ -327,8 +414,7 @@ public class CacheUtils {
// Iterate over given partition.
// Query returns an empty cursor if this partition is not stored on this node.
- for (Cache.Entry<K, V> entry : cache.query(new ScanQuery<K, V>(part,
- (k, v) -> affinity.mapPartitionToNode(p) == locNode && (keyFilter == null || keyFilter.apply(k)))))
+ for (Cache.Entry<K, V> entry : cache.query(new ScanQuery<K, V>(part, (k, v) -> affinity.mapPartitionToNode(p) == locNode && (keyFilter == null || keyFilter.apply(k)))))
fun.accept(new CacheEntry<>(entry, cache));
}
});
@@ -387,12 +473,34 @@ public class CacheUtils {
});
}
+ /**
+ * Distributed version of fold operation.
+ *
+ * @param cacheName Cache name.
+ * @param folder Folder.
+ * @param keyFilter Key filter.
+ * @param accumulator Accumulator.
+ * @param zeroVal Zero value.
+ */
public static <K, V, A> A distributedFold(String cacheName, IgniteBiFunction<Cache.Entry<K, V>, A, A> folder,
IgnitePredicate<K> keyFilter, BinaryOperator<A> accumulator, A zeroVal) {
return sparseFold(cacheName, folder, keyFilter, accumulator, zeroVal, null, null, 0,
false);
}
+ /**
+ * Sparse version of fold. This method also applicable to sparse zeroes.
+ *
+ * @param cacheName Cache name.
+ * @param folder Folder.
+ * @param keyFilter Key filter.
+ * @param accumulator Accumulator.
+ * @param zeroVal Zero value.
+ * @param defVal Def value.
+ * @param defKey Def key.
+ * @param defValCnt Def value count.
+ * @param isNilpotent Is nilpotent.
+ */
private static <K, V, A> A sparseFold(String cacheName, IgniteBiFunction<Cache.Entry<K, V>, A, A> folder,
IgnitePredicate<K> keyFilter, BinaryOperator<A> accumulator, A zeroVal, V defVal, K defKey, long defValCnt,
boolean isNilpotent) {
@@ -411,7 +519,7 @@ public class CacheUtils {
// Use affinity in filter for ScanQuery. Otherwise we accept consumer in each node which is wrong.
Affinity affinity = ignite.affinity(cacheName);
- ClusterNode localNode = ignite.cluster().localNode();
+ ClusterNode locNode = ignite.cluster().localNode();
A a = zeroVal;
@@ -422,7 +530,7 @@ public class CacheUtils {
// Iterate over given partition.
// Query returns an empty cursor if this partition is not stored on this node.
for (Cache.Entry<K, V> entry : cache.query(new ScanQuery<K, V>(part,
- (k, v) -> affinity.mapPartitionToNode(p) == localNode && (keyFilter == null || keyFilter.apply(k)))))
+ (k, v) -> affinity.mapPartitionToNode(p) == locNode && (keyFilter == null || keyFilter.apply(k)))))
a = folder.apply(entry, a);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/0d2b989d/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/AbstractMatrix.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/AbstractMatrix.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/AbstractMatrix.java
index d1d3904..3dc9b43 100644
--- a/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/AbstractMatrix.java
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/AbstractMatrix.java
@@ -503,7 +503,7 @@ public abstract class AbstractMatrix implements Matrix {
/** {@inheritDoc} */
@Override public double determinant() {
- //TODO: This decomposition should be cached
+ //TODO: IGNITE-5799, This decomposition should be cached
LUDecomposition dec = new LUDecomposition(this);
double res = dec.determinant();
dec.destroy();
@@ -515,7 +515,7 @@ public abstract class AbstractMatrix implements Matrix {
if (rowSize() != columnSize())
throw new CardinalityException(rowSize(), columnSize());
- //TODO: This decomposition should be cached
+ //TODO: IGNITE-5799, This decomposition should be cached
LUDecomposition dec = new LUDecomposition(this);
Matrix res = dec.solve(likeIdentity());
http://git-wip-us.apache.org/repos/asf/ignite/blob/0d2b989d/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/BlockEntry.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/BlockEntry.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/BlockEntry.java
new file mode 100644
index 0000000..47f07ce
--- /dev/null
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/BlockEntry.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.math.impls.matrix;
+
+import org.apache.ignite.ml.math.Matrix;
+
+/**
+ * Block for {@link SparseBlockDistributedMatrix}.
+ */
+public final class BlockEntry extends SparseLocalOnHeapMatrix {
+ /** Max block size. */
+ public static final int MAX_BLOCK_SIZE = 32;
+
+ /** */
+ public BlockEntry() {
+ // No-op.
+ }
+
+ /** */
+ public BlockEntry(int row, int col) {
+ super(row, col);
+
+ assert col <= MAX_BLOCK_SIZE;
+ assert row <= MAX_BLOCK_SIZE;
+ }
+
+ /** */
+ public BlockEntry(Matrix mtx) {
+ assert mtx.columnSize() <= MAX_BLOCK_SIZE;
+ assert mtx.rowSize() <= MAX_BLOCK_SIZE;
+
+ setStorage(mtx.getStorage());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/0d2b989d/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/CacheMatrix.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/CacheMatrix.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/CacheMatrix.java
index a7f0afc..7f00bcb 100644
--- a/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/CacheMatrix.java
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/CacheMatrix.java
@@ -65,7 +65,6 @@ public class CacheMatrix<K, V> extends AbstractMatrix {
/**
*
- *
*/
@SuppressWarnings({"unchecked"})
private CacheMatrixStorage<K, V> storage() {
@@ -93,7 +92,7 @@ public class CacheMatrix<K, V> extends AbstractMatrix {
* @param d Value to divide to.
*/
@Override public Matrix divide(double d) {
- return mapOverValues((Double v) -> v / d);
+ return mapOverValues(v -> v / d);
}
/**
@@ -102,7 +101,7 @@ public class CacheMatrix<K, V> extends AbstractMatrix {
* @param x Value to add.
*/
@Override public Matrix plus(double x) {
- return mapOverValues((Double v) -> v + x);
+ return mapOverValues(v -> v + x);
}
/**
@@ -111,12 +110,12 @@ public class CacheMatrix<K, V> extends AbstractMatrix {
* @param x Value to multiply to.
*/
@Override public Matrix times(double x) {
- return mapOverValues((Double v) -> v * x);
+ return mapOverValues(v -> v * x);
}
/** {@inheritDoc} */
@Override public Matrix assign(double val) {
- return mapOverValues((Double v) -> val);
+ return mapOverValues(v -> val);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/0d2b989d/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/SparseBlockDistributedMatrix.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/SparseBlockDistributedMatrix.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/SparseBlockDistributedMatrix.java
new file mode 100644
index 0000000..b3481f9
--- /dev/null
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/SparseBlockDistributedMatrix.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.math.impls.matrix;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.ml.math.Matrix;
+import org.apache.ignite.ml.math.StorageConstants;
+import org.apache.ignite.ml.math.Vector;
+import org.apache.ignite.ml.math.exceptions.CardinalityException;
+import org.apache.ignite.ml.math.exceptions.UnsupportedOperationException;
+import org.apache.ignite.ml.math.functions.IgniteDoubleFunction;
+import org.apache.ignite.ml.math.impls.CacheUtils;
+import org.apache.ignite.ml.math.impls.storage.matrix.BlockMatrixKey;
+import org.apache.ignite.ml.math.impls.storage.matrix.BlockMatrixStorage;
+
+/**
+ * Sparse block distributed matrix. This matrix represented by blocks 32x32 {@link BlockEntry}.
+ *
+ * Using separate cache with keys {@link BlockMatrixKey} and values {@link BlockEntry}.
+ */
+public class SparseBlockDistributedMatrix extends AbstractMatrix implements StorageConstants {
+ /**
+ *
+ */
+ public SparseBlockDistributedMatrix() {
+ // No-op.
+ }
+
+ /**
+ * @param rows Amount of rows in the matrix.
+ * @param cols Amount of columns in the matrix.
+ */
+ public SparseBlockDistributedMatrix(int rows, int cols) {
+ assert rows > 0;
+ assert cols > 0;
+
+ setStorage(new BlockMatrixStorage(rows, cols));
+ }
+
+ /**
+ * Return the same matrix with updates values (broken contract).
+ *
+ * @param d Value to divide to.
+ */
+ @Override public Matrix divide(double d) {
+ return mapOverValues(v -> v / d);
+ }
+
+ /**
+ * Return the same matrix with updates values (broken contract).
+ *
+ * @param x Value to add.
+ */
+ @Override public Matrix plus(double x) {
+ return mapOverValues(v -> v + x);
+ }
+
+ /**
+ * Return the same matrix with updates values (broken contract).
+ *
+ * @param x Value to multiply.
+ */
+ @Override public Matrix times(double x) {
+ return mapOverValues(v -> v * x);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @SuppressWarnings({"unchecked"})
+ @Override public Matrix times(final Matrix mtx) {
+ if (mtx == null)
+ throw new IllegalArgumentException("The matrix should be not null.");
+
+ if (columnSize() != mtx.rowSize())
+ throw new CardinalityException(columnSize(), mtx.rowSize());
+
+ SparseBlockDistributedMatrix matrixA = this;
+ SparseBlockDistributedMatrix matrixB = (SparseBlockDistributedMatrix)mtx;
+
+ String cacheName = BlockMatrixStorage.ML_BLOCK_CACHE_NAME;
+ SparseBlockDistributedMatrix matrixC = new SparseBlockDistributedMatrix(matrixA.rowSize(), matrixB.columnSize());
+
+ CacheUtils.bcast(BlockMatrixStorage.ML_BLOCK_CACHE_NAME, () -> {
+ Ignite ignite = Ignition.localIgnite();
+ Affinity affinity = ignite.affinity(cacheName);
+
+ IgniteCache<BlockMatrixKey, BlockEntry> cache = ignite.getOrCreateCache(cacheName);
+ ClusterNode locNode = ignite.cluster().localNode();
+
+ BlockMatrixStorage storageC = matrixC.storage();
+
+ Map<ClusterNode, Collection<BlockMatrixKey>> keysCToNodes = affinity.mapKeysToNodes(storageC.getAllKeys());
+ Collection<BlockMatrixKey> locKeys = keysCToNodes.get(locNode);
+
+ if (locKeys == null)
+ return;
+
+ // compute Cij locally on each node
+ // TODO: IGNITE:5114, exec in parallel
+ locKeys.forEach(key -> {
+ long newBlockId = key.blockId();
+ BlockEntry blockC = null;
+
+ List<BlockEntry> aRow = matrixA.storage().getRowForBlock(newBlockId, storageC);
+ List<BlockEntry> bCol = matrixB.storage().getColForBlock(newBlockId, storageC);
+
+ for (int i = 0; i < aRow.size(); i++) {
+ BlockEntry blockA = aRow.get(i);
+ BlockEntry blockB = bCol.get(i);
+
+ BlockEntry tmpBlock = new BlockEntry(blockA.times(blockB));
+
+ blockC = blockC == null ? tmpBlock : new BlockEntry(blockC.plus(tmpBlock));
+ }
+
+ cache.put(storageC.getCacheKey(newBlockId), blockC);
+ });
+ });
+
+ return matrixC;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Matrix assign(double val) {
+ return mapOverValues(v -> val);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Matrix map(IgniteDoubleFunction<Double> fun) {
+ return mapOverValues(fun);
+ }
+
+ /** {@inheritDoc} */
+ @Override public double sum() {
+ return CacheUtils.sparseSum(getUUID(), BlockMatrixStorage.ML_BLOCK_CACHE_NAME);
+ }
+
+ /** {@inheritDoc} */
+ @Override public double maxValue() {
+ return CacheUtils.sparseMax(getUUID(), BlockMatrixStorage.ML_BLOCK_CACHE_NAME);
+ }
+
+ /** {@inheritDoc} */
+ @Override public double minValue() {
+ return CacheUtils.sparseMin(getUUID(), BlockMatrixStorage.ML_BLOCK_CACHE_NAME);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Matrix copy() {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Matrix like(int rows, int cols) {
+ return new SparseBlockDistributedMatrix(rows, cols);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Vector likeVector(int crd) {
+ throw new UnsupportedOperationException();
+ }
+
+ /** */
+ private IgniteUuid getUUID() {
+ return ((BlockMatrixStorage)getStorage()).getUUID();
+ }
+
+ /**
+ * @param mapper Mapping function.
+ * @return Matrix with mapped values.
+ */
+ private Matrix mapOverValues(IgniteDoubleFunction<Double> mapper) {
+ CacheUtils.sparseMap(getUUID(), mapper, BlockMatrixStorage.ML_BLOCK_CACHE_NAME);
+
+ return this;
+ }
+
+ /**
+ *
+ */
+ private BlockMatrixStorage storage() {
+ return (BlockMatrixStorage)getStorage();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/0d2b989d/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/SparseDistributedMatrix.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/SparseDistributedMatrix.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/SparseDistributedMatrix.java
index df2ddc4..a86db95 100644
--- a/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/SparseDistributedMatrix.java
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/SparseDistributedMatrix.java
@@ -23,7 +23,6 @@ import org.apache.ignite.ml.math.StorageConstants;
import org.apache.ignite.ml.math.Vector;
import org.apache.ignite.ml.math.exceptions.UnsupportedOperationException;
import org.apache.ignite.ml.math.functions.IgniteDoubleFunction;
-import org.apache.ignite.ml.math.functions.IgniteFunction;
import org.apache.ignite.ml.math.impls.CacheUtils;
import org.apache.ignite.ml.math.impls.storage.matrix.SparseDistributedMatrixStorage;
@@ -61,10 +60,7 @@ public class SparseDistributedMatrix extends AbstractMatrix implements StorageCo
setStorage(new SparseDistributedMatrixStorage(rows, cols, stoMode, acsMode));
}
- /**
- *
- *
- */
+ /** */
private SparseDistributedMatrixStorage storage() {
return (SparseDistributedMatrixStorage)getStorage();
}
@@ -75,7 +71,7 @@ public class SparseDistributedMatrix extends AbstractMatrix implements StorageCo
* @param d Value to divide to.
*/
@Override public Matrix divide(double d) {
- return mapOverValues((Double v) -> v / d);
+ return mapOverValues(v -> v / d);
}
/**
@@ -84,7 +80,7 @@ public class SparseDistributedMatrix extends AbstractMatrix implements StorageCo
* @param x Value to add.
*/
@Override public Matrix plus(double x) {
- return mapOverValues((Double v) -> v + x);
+ return mapOverValues(v -> v + x);
}
/**
@@ -93,42 +89,42 @@ public class SparseDistributedMatrix extends AbstractMatrix implements StorageCo
* @param x Value to multiply.
*/
@Override public Matrix times(double x) {
- return mapOverValues((Double v) -> v * x);
+ return mapOverValues(v -> v * x);
}
/** {@inheritDoc} */
@Override public Matrix assign(double val) {
- return mapOverValues((Double v) -> val);
+ return mapOverValues(v -> val);
}
/** {@inheritDoc} */
@Override public Matrix map(IgniteDoubleFunction<Double> fun) {
- return mapOverValues(fun::apply);
+ return mapOverValues(fun);
}
/**
* @param mapper Mapping function.
* @return Matrix with mapped values.
*/
- private Matrix mapOverValues(IgniteFunction<Double, Double> mapper) {
- CacheUtils.sparseMap(getUUID(), mapper);
+ private Matrix mapOverValues(IgniteDoubleFunction<Double> mapper) {
+ CacheUtils.sparseMap(getUUID(), mapper, SparseDistributedMatrixStorage.ML_CACHE_NAME);
return this;
}
/** {@inheritDoc} */
@Override public double sum() {
- return CacheUtils.sparseSum(getUUID());
+ return CacheUtils.sparseSum(getUUID(), SparseDistributedMatrixStorage.ML_CACHE_NAME);
}
/** {@inheritDoc} */
@Override public double maxValue() {
- return CacheUtils.sparseMax(getUUID());
+ return CacheUtils.sparseMax(getUUID(), SparseDistributedMatrixStorage.ML_CACHE_NAME);
}
/** {@inheritDoc} */
@Override public double minValue() {
- return CacheUtils.sparseMin(getUUID());
+ return CacheUtils.sparseMin(getUUID(), SparseDistributedMatrixStorage.ML_CACHE_NAME);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/0d2b989d/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/storage/matrix/BaseBlockMatrixKey.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/storage/matrix/BaseBlockMatrixKey.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/storage/matrix/BaseBlockMatrixKey.java
new file mode 100644
index 0000000..74ddfe5
--- /dev/null
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/storage/matrix/BaseBlockMatrixKey.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.math.impls.storage.matrix;
+
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.ml.math.impls.matrix.SparseBlockDistributedMatrix;
+
+/**
+ * Cache key for blocks in {@link SparseBlockDistributedMatrix}.
+ */
+public interface BaseBlockMatrixKey {
+ /**
+ * @return block id.
+ */
+ public long blockId();
+
+ /**
+ * @return matrix id.
+ */
+ public IgniteUuid matrixId();
+
+ /**
+ * @return key affinity key.
+ */
+ public IgniteUuid affinityKey();
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/0d2b989d/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/storage/matrix/BlockMatrixKey.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/storage/matrix/BlockMatrixKey.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/storage/matrix/BlockMatrixKey.java
new file mode 100644
index 0000000..3749f44
--- /dev/null
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/storage/matrix/BlockMatrixKey.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.math.impls.storage.matrix;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import org.apache.ignite.binary.BinaryObjectException;
+import org.apache.ignite.binary.BinaryRawReader;
+import org.apache.ignite.binary.BinaryRawWriter;
+import org.apache.ignite.binary.BinaryReader;
+import org.apache.ignite.binary.BinaryWriter;
+import org.apache.ignite.binary.Binarylizable;
+import org.apache.ignite.internal.binary.BinaryUtils;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.ml.math.impls.matrix.BlockEntry;
+import org.apache.ignite.ml.math.impls.matrix.SparseBlockDistributedMatrix;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Key implementation for {@link BlockEntry} using for {@link SparseBlockDistributedMatrix}.
+ */
+public class BlockMatrixKey implements BaseBlockMatrixKey, Externalizable, Binarylizable {
+ /** */
+ private static final long serialVersionUID = 0L;
+ /** Block ID */
+ private long blockId;
+ /** Matrix ID */
+ private IgniteUuid matrixUuid;
+ /** Block affinity key. */
+ private IgniteUuid affinityKey;
+
+ /**
+ * Empty constructor required for {@link Externalizable}.
+ */
+ public BlockMatrixKey() {
+ // No-op.
+ }
+
+ /**
+ * Construct matrix block key.
+ *
+ * @param blockId Block id.
+ * @param matrixUuid Matrix uuid.
+ * @param affinityKey Affinity key.
+ */
+ public BlockMatrixKey(long blockId, IgniteUuid matrixUuid, @Nullable IgniteUuid affinityKey) {
+ assert blockId >= 0;
+ assert matrixUuid != null;
+
+ this.blockId = blockId;
+ this.matrixUuid = matrixUuid;
+ this.affinityKey = affinityKey;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long blockId() {
+ return blockId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteUuid matrixId() {
+ return matrixUuid;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteUuid affinityKey() {
+ return affinityKey;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ U.writeGridUuid(out, matrixUuid);
+ U.writeGridUuid(out, affinityKey);
+ out.writeLong(blockId);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ matrixUuid = U.readGridUuid(in);
+ affinityKey = U.readGridUuid(in);
+ blockId = in.readLong();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeBinary(BinaryWriter writer) throws BinaryObjectException {
+ BinaryRawWriter out = writer.rawWriter();
+
+ BinaryUtils.writeIgniteUuid(out, matrixUuid);
+ BinaryUtils.writeIgniteUuid(out, affinityKey);
+ out.writeLong(blockId);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readBinary(BinaryReader reader) throws BinaryObjectException {
+ BinaryRawReader in = reader.rawReader();
+
+ matrixUuid = BinaryUtils.readIgniteUuid(in);
+ affinityKey = BinaryUtils.readIgniteUuid(in);
+ blockId = in.readLong();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return matrixUuid.hashCode() + (int)(blockId ^ (blockId >>> 32));
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object obj) {
+ if (obj == this)
+ return true;
+
+ if (obj == null || obj.getClass() != getClass())
+ return false;
+
+ BlockMatrixKey that = (BlockMatrixKey)obj;
+
+ return blockId == that.blockId && matrixUuid.equals(that.matrixUuid) && F.eq(affinityKey, that.affinityKey);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(BlockMatrixKey.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/0d2b989d/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/storage/matrix/BlockMatrixStorage.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/storage/matrix/BlockMatrixStorage.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/storage/matrix/BlockMatrixStorage.java
new file mode 100644
index 0000000..6640e5a
--- /dev/null
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/storage/matrix/BlockMatrixStorage.java
@@ -0,0 +1,435 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.math.impls.storage.matrix;
+
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.LongStream;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.CachePeekMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.ml.math.MatrixStorage;
+import org.apache.ignite.ml.math.StorageConstants;
+import org.apache.ignite.ml.math.impls.CacheUtils;
+import org.apache.ignite.ml.math.impls.matrix.BlockEntry;
+import org.apache.ignite.ml.math.impls.matrix.SparseBlockDistributedMatrix;
+
+import static org.apache.ignite.ml.math.impls.matrix.BlockEntry.MAX_BLOCK_SIZE;
+
+/**
+ * Storage for {@link SparseBlockDistributedMatrix}.
+ */
+public class BlockMatrixStorage extends CacheUtils implements MatrixStorage, StorageConstants {
+ /** Cache name used for all instances of {@link BlockMatrixStorage}. */
+ public static final String ML_BLOCK_CACHE_NAME = "ML_BLOCK_SPARSE_MATRICES_CONTAINER";
+ /** */
+ private int blocksInCol;
+ /** */
+ private int blocksInRow;
+ /** Amount of rows in the matrix. */
+ private int rows;
+ /** Amount of columns in the matrix. */
+ private int cols;
+ /** Matrix uuid. */
+ private IgniteUuid uuid;
+ /** Block size about 8 KB of data. */
+ private int maxBlockEdge = MAX_BLOCK_SIZE;
+
+ /** Actual distributed storage. */
+ private IgniteCache<
+ BlockMatrixKey /* Matrix block number with uuid. */,
+ BlockEntry /* Block of matrix, local sparse matrix. */
+ > cache = null;
+
+ /**
+ *
+ */
+ public BlockMatrixStorage() {
+ // No-op.
+ }
+
+ /**
+ * @param rows Amount of rows in the matrix.
+ * @param cols Amount of columns in the matrix.
+ */
+ public BlockMatrixStorage(int rows, int cols) {
+ assert rows > 0;
+ assert cols > 0;
+
+ this.rows = rows;
+ this.cols = cols;
+
+ //cols % maxBlockEdge > 0 ? 1 : 0
+
+ this.blocksInRow = cols % maxBlockEdge == 0 ? cols / maxBlockEdge : cols / maxBlockEdge + 1;
+ this.blocksInCol = rows % maxBlockEdge == 0 ? rows / maxBlockEdge : rows / maxBlockEdge + 1;
+
+ cache = newCache();
+
+ uuid = IgniteUuid.randomUuid();
+ }
+
+ /**
+ *
+ */
+ public IgniteCache<BlockMatrixKey, BlockEntry> cache() {
+ return cache;
+ }
+
+ /** {@inheritDoc} */
+ @Override public double get(int x, int y) {
+ return matrixGet(x, y);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void set(int x, int y, double v) {
+ matrixSet(x, y, v);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int columnSize() {
+ return cols;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int rowSize() {
+ return rows;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeInt(rows);
+ out.writeInt(cols);
+ out.writeInt(blocksInRow);
+ out.writeInt(blocksInCol);
+ U.writeGridUuid(out, uuid);
+ out.writeUTF(cache.getName());
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ rows = in.readInt();
+ cols = in.readInt();
+ blocksInRow = in.readInt();
+ blocksInCol = in.readInt();
+ uuid = U.readGridUuid(in);
+ cache = ignite().getOrCreateCache(in.readUTF());
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isSequentialAccess() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isDense() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isRandomAccess() {
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isDistributed() {
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isArrayBased() {
+ return false;
+ }
+
+ /** Delete all data from cache. */
+ @Override public void destroy() {
+ long maxBlockId = getBlockId(cols, rows);
+
+ Set<BlockMatrixKey> keyset = LongStream.rangeClosed(0, maxBlockId).mapToObj(this::getCacheKey).collect(Collectors.toSet());
+
+ cache.clearAll(keyset);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ int res = 1;
+
+ res = res * 37 + cols;
+ res = res * 37 + rows;
+ res = res * 37 + uuid.hashCode();
+ res = res * 37 + cache.hashCode();
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+
+ if (obj == null || getClass() != obj.getClass())
+ return false;
+
+ BlockMatrixStorage that = (BlockMatrixStorage)obj;
+
+ return rows == that.rows && cols == that.cols && uuid.equals(that.uuid)
+ && (cache != null ? cache.equals(that.cache) : that.cache == null);
+ }
+
+ /**
+ * Get storage UUID.
+ *
+ * @return storage UUID.
+ */
+ public IgniteUuid getUUID() {
+ return uuid;
+ }
+
+ /**
+ * Build the cache key for the given block id
+ */
+ public BlockMatrixKey getCacheKey(long blockId) {
+ return new BlockMatrixKey(blockId, uuid, getAffinityKey(blockId));
+ }
+
+ /**
+ * Get rows for current block.
+ *
+ * @param blockId block id.
+ * @param storageC result storage.
+ * @return The list of block entries.
+ */
+ public List<BlockEntry> getRowForBlock(long blockId, BlockMatrixStorage storageC) {
+ long blockRow = blockId / storageC.blocksInCol;
+ long blockCol = blockId % storageC.blocksInRow;
+
+ long locBlock = this.blocksInRow * (blockRow) + (blockCol >= this.blocksInRow ? (blocksInRow - 1) : blockCol);
+
+ return getRowForBlock(locBlock);
+ }
+
+ /**
+ * Get cols for current block.
+ *
+ * @param blockId block id.
+ * @param storageC result storage.
+ * @return The list of block entries.
+ */
+ public List<BlockEntry> getColForBlock(long blockId, BlockMatrixStorage storageC) {
+ long blockRow = blockId / storageC.blocksInCol;
+ long blockCol = blockId % storageC.blocksInRow;
+
+ long locBlock = this.blocksInRow * (blockRow) + (blockCol >= this.blocksInRow ? (blocksInRow - 1) : blockCol);
+
+ return getColForBlock(locBlock);
+ }
+
+ /**
+ * Build a keyset for this matrix storage.
+ */
+ public Collection<BlockMatrixKey> getAllKeys() {
+ long maxBlockId = numberOfBlocks();
+ Collection<BlockMatrixKey> keys = new LinkedList<>();
+
+ for (long id = 0; id < maxBlockId; id++)
+ keys.add(getCacheKey(id));
+
+ return keys;
+ }
+
+ /** */
+ private List<BlockEntry> getRowForBlock(long blockId) {
+ List<BlockEntry> res = new LinkedList<>();
+
+ boolean isFirstRow = blockId < blocksInRow;
+
+ long startBlock = isFirstRow ? 0 : blockId - blockId % blocksInRow;
+ long endBlock = startBlock + blocksInRow - 1;
+
+ for (long i = startBlock; i <= endBlock; i++)
+ res.add(getEntryById(i));
+
+ return res;
+ }
+
+ /** */
+ private List<BlockEntry> getColForBlock(long blockId) {
+ List<BlockEntry> res = new LinkedList<>();
+
+ long startBlock = blockId % blocksInRow;
+ long endBlock = startBlock + blocksInRow * (blocksInCol - 1);
+
+ for (long i = startBlock; i <= endBlock; i += blocksInRow)
+ res.add(getEntryById(i));
+
+ return res;
+ }
+
+ /**
+ *
+ */
+ private BlockEntry getEntryById(long blockId) {
+ BlockMatrixKey key = getCacheKey(blockId);
+
+ BlockEntry entry = cache.localPeek(key);
+ entry = entry != null ? entry : cache.get(key);
+
+ if (entry == null) {
+ long colId = blockId == 0 ? 0 : blockId + 1;
+
+ boolean isLastRow = (blockId) >= blocksInRow * (blocksInCol - 1);
+ boolean isLastCol = (colId) % blocksInRow == 0;
+
+ entry = new BlockEntry(isLastRow && rows % maxBlockEdge != 0 ? rows % maxBlockEdge : maxBlockEdge, isLastCol && cols % maxBlockEdge != 0 ? cols % maxBlockEdge : maxBlockEdge);
+ }
+
+ return entry;
+ }
+
+ /**
+ *
+ */
+ private long numberOfBlocks() {
+ int rows = rowSize();
+ int cols = columnSize();
+
+ return ((rows / maxBlockEdge) + (((rows % maxBlockEdge) > 0) ? 1 : 0))
+ * ((cols / maxBlockEdge) + (((cols % maxBlockEdge) > 0) ? 1 : 0));
+ }
+
+ /**
+ * TODO: IGNITE-5646, WIP
+ *
+ * Get affinity key for the given id.
+ */
+ private IgniteUuid getAffinityKey(long id) {
+ return null;
+ }
+
+ /**
+ * Distributed matrix set.
+ *
+ * @param a Row or column index.
+ * @param b Row or column index.
+ * @param v New value to set.
+ */
+ private void matrixSet(int a, int b, double v) {
+ long id = getBlockId(a, b);
+ // Remote set on the primary node (where given row or column is stored locally).
+ ignite().compute(groupForKey(ML_BLOCK_CACHE_NAME, id)).run(() -> {
+ IgniteCache<BlockMatrixKey, BlockEntry> cache = Ignition.localIgnite().getOrCreateCache(ML_BLOCK_CACHE_NAME);
+
+ BlockMatrixKey key = getCacheKey(getBlockId(a, b));
+
+ // Local get.
+ BlockEntry block = cache.localPeek(key, CachePeekMode.PRIMARY);
+
+ if (block == null)
+ block = cache.get(key); //Remote entry get.
+
+ if (block == null)
+ block = initBlockFor(a, b);
+
+ block.set(a % block.rowSize(), b % block.columnSize(), v);
+
+ // Local put.
+ cache.put(key, block);
+ });
+ }
+
+ /** */
+ private long getBlockId(int x, int y) {
+ return (y / maxBlockEdge) * blockShift(cols) + (x / maxBlockEdge);
+ }
+
+ /** */
+ private BlockEntry initBlockFor(int x, int y) {
+ int blockRows = rows - x >= maxBlockEdge ? maxBlockEdge : rows - x;
+ int blockCols = cols - y >= maxBlockEdge ? maxBlockEdge : cols - y;
+
+ return new BlockEntry(blockRows, blockCols);
+ }
+
+ /** */
+ private int blockShift(int i) {
+ return (i) / maxBlockEdge + ((i) % maxBlockEdge > 0 ? 1 : 0);
+ }
+
+ /**
+ * Distributed matrix get.
+ *
+ * @param a Row or column index.
+ * @param b Row or column index.
+ * @return Matrix value at (a, b) index.
+ */
+ private double matrixGet(int a, int b) {
+ // Remote get from the primary node (where given row or column is stored locally).
+ return ignite().compute(groupForKey(ML_BLOCK_CACHE_NAME, getBlockId(a, b))).call(() -> {
+ IgniteCache<BlockMatrixKey, BlockEntry> cache = Ignition.localIgnite().getOrCreateCache(ML_BLOCK_CACHE_NAME);
+
+ BlockMatrixKey key = getCacheKey(getBlockId(a, b));
+
+ // Local get.
+ BlockEntry block = cache.localPeek(key, CachePeekMode.PRIMARY);
+
+ if (block == null)
+ block = cache.get(key);
+
+ return block == null ? 0.0 : block.get(a % block.rowSize(), b % block.columnSize());
+ });
+ }
+
+ /**
+ * Create new ML cache if needed.
+ */
+ private IgniteCache<BlockMatrixKey, BlockEntry> newCache() {
+ CacheConfiguration<BlockMatrixKey, BlockEntry> cfg = new CacheConfiguration<>();
+
+ // Write to primary.
+ cfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.PRIMARY_SYNC);
+
+ // Atomic transactions only.
+ cfg.setAtomicityMode(CacheAtomicityMode.ATOMIC);
+
+ // No eviction.
+ cfg.setEvictionPolicy(null);
+
+ // No copying of values.
+ cfg.setCopyOnRead(false);
+
+ // Cache is partitioned.
+ cfg.setCacheMode(CacheMode.PARTITIONED);
+
+ // Random cache name.
+ cfg.setName(ML_BLOCK_CACHE_NAME);
+
+ return Ignition.localIgnite().getOrCreateCache(cfg);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/0d2b989d/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/storage/vector/SparseLocalOnHeapVectorStorage.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/storage/vector/SparseLocalOnHeapVectorStorage.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/storage/vector/SparseLocalOnHeapVectorStorage.java
index f2efe74..5145376 100644
--- a/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/storage/vector/SparseLocalOnHeapVectorStorage.java
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/storage/vector/SparseLocalOnHeapVectorStorage.java
@@ -46,9 +46,7 @@ public class SparseLocalOnHeapVectorStorage implements VectorStorage, StorageCon
// No-op.
}
- /**
- * @param map
- */
+ /** */
public SparseLocalOnHeapVectorStorage(Map<Integer, Double> map, boolean copy) {
assert map.size() > 0;
http://git-wip-us.apache.org/repos/asf/ignite/blob/0d2b989d/modules/ml/src/main/java/org/apache/ignite/ml/math/statistics/Variance.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/statistics/Variance.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/statistics/Variance.java
index e406b5b..525e6e9 100644
--- a/modules/ml/src/main/java/org/apache/ignite/ml/math/statistics/Variance.java
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/statistics/Variance.java
@@ -30,6 +30,7 @@ public class Variance {
/** */
private double m2;
+ /** */
public Variance() {
mean = 0;
n = 0;
http://git-wip-us.apache.org/repos/asf/ignite/blob/0d2b989d/modules/ml/src/main/java/org/apache/ignite/ml/math/statistics/package-info.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/statistics/package-info.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/statistics/package-info.java
new file mode 100644
index 0000000..7b65fce
--- /dev/null
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/statistics/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * <!-- Package description. -->
+ * Statistics stuff.
+ */
+package org.apache.ignite.ml.math.statistics;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/0d2b989d/modules/ml/src/main/java/org/apache/ignite/ml/math/util/MapUtil.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/util/MapUtil.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/util/MapUtil.java
index 6c25f0e..9190901 100644
--- a/modules/ml/src/main/java/org/apache/ignite/ml/math/util/MapUtil.java
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/util/MapUtil.java
@@ -25,7 +25,7 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
- *
+ * Some {@link Map} related utils.
*/
public class MapUtil {
/** */
http://git-wip-us.apache.org/repos/asf/ignite/blob/0d2b989d/modules/ml/src/main/java/org/apache/ignite/ml/math/util/package-info.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/util/package-info.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/util/package-info.java
new file mode 100644
index 0000000..2507ee4
--- /dev/null
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/util/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * <!-- Package description. -->
+ * Some math utils.
+ */
+package org.apache.ignite.ml.math.util;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/0d2b989d/modules/ml/src/main/java/org/apache/ignite/ml/package-info.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/package-info.java b/modules/ml/src/main/java/org/apache/ignite/ml/package-info.java
new file mode 100644
index 0000000..779581b
--- /dev/null
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * <!-- Package description. -->
+ * Root ML package.
+ */
+package org.apache.ignite.ml;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/0d2b989d/modules/ml/src/test/java/org/apache/ignite/ml/math/MathImplDistributedTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/test/java/org/apache/ignite/ml/math/MathImplDistributedTestSuite.java b/modules/ml/src/test/java/org/apache/ignite/ml/math/MathImplDistributedTestSuite.java
index 9899d3b..5dc860c 100644
--- a/modules/ml/src/test/java/org/apache/ignite/ml/math/MathImplDistributedTestSuite.java
+++ b/modules/ml/src/test/java/org/apache/ignite/ml/math/MathImplDistributedTestSuite.java
@@ -18,6 +18,7 @@
package org.apache.ignite.ml.math;
import org.apache.ignite.ml.math.impls.matrix.CacheMatrixTest;
+import org.apache.ignite.ml.math.impls.matrix.SparseDistributedBlockMatrixTest;
import org.apache.ignite.ml.math.impls.matrix.SparseDistributedMatrixTest;
import org.apache.ignite.ml.math.impls.storage.matrix.SparseDistributedMatrixStorageTest;
import org.apache.ignite.ml.math.impls.vector.CacheVectorTest;
@@ -33,6 +34,7 @@ import org.junit.runners.Suite;
CacheMatrixTest.class,
SparseDistributedMatrixStorageTest.class,
SparseDistributedMatrixTest.class,
+ SparseDistributedBlockMatrixTest.class
})
public class MathImplDistributedTestSuite {
// No-op.
http://git-wip-us.apache.org/repos/asf/ignite/blob/0d2b989d/modules/ml/src/test/java/org/apache/ignite/ml/math/impls/matrix/SparseDistributedBlockMatrixTest.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/test/java/org/apache/ignite/ml/math/impls/matrix/SparseDistributedBlockMatrixTest.java b/modules/ml/src/test/java/org/apache/ignite/ml/math/impls/matrix/SparseDistributedBlockMatrixTest.java
new file mode 100644
index 0000000..1228f05
--- /dev/null
+++ b/modules/ml/src/test/java/org/apache/ignite/ml/math/impls/matrix/SparseDistributedBlockMatrixTest.java
@@ -0,0 +1,379 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.math.impls.matrix;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.ml.math.Matrix;
+import org.apache.ignite.ml.math.exceptions.UnsupportedOperationException;
+import org.apache.ignite.ml.math.impls.MathTestConstants;
+import org.apache.ignite.ml.math.impls.storage.matrix.BlockMatrixKey;
+import org.apache.ignite.ml.math.impls.storage.matrix.BlockMatrixStorage;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.testframework.junits.common.GridCommonTest;
+
+import static org.apache.ignite.ml.math.impls.MathTestConstants.UNEXPECTED_VAL;
+
+/**
+ * Tests for {@link SparseBlockDistributedMatrix}.
+ */
+@GridCommonTest(group = "Distributed Models")
+public class SparseDistributedBlockMatrixTest extends GridCommonAbstractTest {
+ /** Number of nodes in grid */
+ private static final int NODE_COUNT = 3;
+ /** Precision. */
+ private static final double PRECISION = 0.0;
+ /** Grid instance. */
+ private Ignite ignite;
+ /** Matrix rows */
+ private final int rows = MathTestConstants.STORAGE_SIZE;
+ /** Matrix cols */
+ private final int cols = MathTestConstants.STORAGE_SIZE;
+ /** Matrix for tests */
+ private SparseBlockDistributedMatrix cacheMatrix;
+
+ /**
+ * Default constructor.
+ */
+ public SparseDistributedBlockMatrixTest() {
+ super(false);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ for (int i = 1; i <= NODE_COUNT; i++)
+ startGrid(i);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ stopAllGrids();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override protected void beforeTest() throws Exception {
+ ignite = grid(NODE_COUNT);
+
+ ignite.configuration().setPeerClassLoadingEnabled(true);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ if (cacheMatrix != null) {
+ cacheMatrix.destroy();
+ cacheMatrix = null;
+ }
+ }
+
+ /** */
+ public void testGetSet() throws Exception {
+ IgniteUtils.setCurrentIgniteName(ignite.configuration().getIgniteInstanceName());
+
+ cacheMatrix = new SparseBlockDistributedMatrix(rows, cols);
+
+ for (int i = 0; i < rows; i++) {
+ for (int j = 0; j < cols; j++) {
+ double v = Math.random();
+ cacheMatrix.set(i, j, v);
+
+ assertEquals("Unexpected value for matrix element["+ i +" " + j + "]", v, cacheMatrix.get(i, j), PRECISION);
+ }
+ }
+ }
+
+ /** */
+ public void testExternalize() throws IOException, ClassNotFoundException {
+ IgniteUtils.setCurrentIgniteName(ignite.configuration().getIgniteInstanceName());
+
+ cacheMatrix = new SparseBlockDistributedMatrix(rows, cols);
+
+ cacheMatrix.set(1, 1, 1.0);
+
+ ByteArrayOutputStream byteArrOutputStream = new ByteArrayOutputStream();
+ ObjectOutputStream objOutputStream = new ObjectOutputStream(byteArrOutputStream);
+
+ objOutputStream.writeObject(cacheMatrix);
+
+ ByteArrayInputStream byteArrInputStream = new ByteArrayInputStream(byteArrOutputStream.toByteArray());
+ ObjectInputStream objInputStream = new ObjectInputStream(byteArrInputStream);
+
+ SparseBlockDistributedMatrix objRestored = (SparseBlockDistributedMatrix)objInputStream.readObject();
+
+ assertTrue(MathTestConstants.VAL_NOT_EQUALS, cacheMatrix.equals(objRestored));
+ assertEquals(MathTestConstants.VAL_NOT_EQUALS, objRestored.get(1, 1), 1.0, PRECISION);
+ }
+
+ /** Test simple math. */
+ public void testMath() {
+ IgniteUtils.setCurrentIgniteName(ignite.configuration().getIgniteInstanceName());
+
+ cacheMatrix = new SparseBlockDistributedMatrix(rows, cols);
+ initMtx(cacheMatrix);
+
+ cacheMatrix.assign(2.0);
+ for (int i = 0; i < cacheMatrix.rowSize(); i++)
+ for (int j = 0; j < cacheMatrix.columnSize(); j++)
+ assertEquals(UNEXPECTED_VAL, 2.0, cacheMatrix.get(i, j), PRECISION);
+
+ cacheMatrix.plus(3.0);
+ for (int i = 0; i < cacheMatrix.rowSize(); i++)
+ for (int j = 0; j < cacheMatrix.columnSize(); j++)
+ assertEquals(UNEXPECTED_VAL, 5.0, cacheMatrix.get(i, j), PRECISION);
+
+ cacheMatrix.times(2.0);
+ for (int i = 0; i < cacheMatrix.rowSize(); i++)
+ for (int j = 0; j < cacheMatrix.columnSize(); j++)
+ assertEquals(UNEXPECTED_VAL, 10.0, cacheMatrix.get(i, j), PRECISION);
+
+ cacheMatrix.divide(10.0);
+ for (int i = 0; i < cacheMatrix.rowSize(); i++)
+ for (int j = 0; j < cacheMatrix.columnSize(); j++)
+ assertEquals(UNEXPECTED_VAL, 1.0, cacheMatrix.get(i, j), PRECISION);
+
+ assertEquals(UNEXPECTED_VAL, cacheMatrix.rowSize() * cacheMatrix.columnSize(), cacheMatrix.sum(), PRECISION);
+ }
+
+ /** */
+ public void testMinMax() {
+ IgniteUtils.setCurrentIgniteName(ignite.configuration().getIgniteInstanceName());
+
+ cacheMatrix = new SparseBlockDistributedMatrix(rows, cols);
+
+ for (int i = 0; i < cacheMatrix.rowSize(); i++)
+ for (int j = 0; j < cacheMatrix.columnSize(); j++)
+ cacheMatrix.set(i, j, i * cols + j + 1);
+
+ assertEquals(UNEXPECTED_VAL, 1.0, cacheMatrix.minValue(), PRECISION);
+ assertEquals(UNEXPECTED_VAL, rows * cols, cacheMatrix.maxValue(), PRECISION);
+
+ for (int i = 0; i < cacheMatrix.rowSize(); i++)
+ for (int j = 0; j < cacheMatrix.columnSize(); j++)
+ cacheMatrix.set(i, j, -1.0 * (i * cols + j + 1));
+
+ assertEquals(UNEXPECTED_VAL, -rows * cols, cacheMatrix.minValue(), PRECISION);
+ assertEquals(UNEXPECTED_VAL, -1.0, cacheMatrix.maxValue(), PRECISION);
+
+ for (int i = 0; i < cacheMatrix.rowSize(); i++)
+ for (int j = 0; j < cacheMatrix.columnSize(); j++)
+ cacheMatrix.set(i, j, i * cols + j);
+
+ assertEquals(UNEXPECTED_VAL, 0.0, cacheMatrix.minValue(), PRECISION);
+ assertEquals(UNEXPECTED_VAL, rows * cols - 1.0, cacheMatrix.maxValue(), PRECISION);
+ }
+
+ /** */
+ public void testMap() {
+ IgniteUtils.setCurrentIgniteName(ignite.configuration().getIgniteInstanceName());
+
+ cacheMatrix = new SparseBlockDistributedMatrix(rows, cols);
+ initMtx(cacheMatrix);
+
+ cacheMatrix.map(i -> 100.0);
+ for (int i = 0; i < cacheMatrix.rowSize(); i++)
+ for (int j = 0; j < cacheMatrix.columnSize(); j++)
+ assertEquals(UNEXPECTED_VAL, 100.0, cacheMatrix.get(i, j), PRECISION);
+ }
+
+ /** */
+ public void testCopy() {
+ IgniteUtils.setCurrentIgniteName(ignite.configuration().getIgniteInstanceName());
+
+ cacheMatrix = new SparseBlockDistributedMatrix(rows, cols);
+
+ try {
+ cacheMatrix.copy();
+ fail("UnsupportedOperationException expected.");
+ }
+ catch (UnsupportedOperationException e) {
+ return;
+ }
+ fail("UnsupportedOperationException expected.");
+ }
+
+ /** */
+ public void testCacheBehaviour(){
+ IgniteUtils.setCurrentIgniteName(ignite.configuration().getIgniteInstanceName());
+
+ SparseBlockDistributedMatrix cacheMatrix1 = new SparseBlockDistributedMatrix(rows, cols);
+ SparseBlockDistributedMatrix cacheMatrix2 = new SparseBlockDistributedMatrix(rows, cols);
+
+ initMtx(cacheMatrix1);
+ initMtx(cacheMatrix2);
+
+ Collection<String> cacheNames = ignite.cacheNames();
+
+ assert cacheNames.contains(BlockMatrixStorage.ML_BLOCK_CACHE_NAME);
+
+ IgniteCache<BlockMatrixKey, Object> cache = ignite.getOrCreateCache(BlockMatrixStorage.ML_BLOCK_CACHE_NAME);
+
+ Set<BlockMatrixKey> keySet1 = buildKeySet(cacheMatrix1);
+ Set<BlockMatrixKey> keySet2 = buildKeySet(cacheMatrix2);
+
+ assert cache.containsKeys(keySet1);
+ assert cache.containsKeys(keySet2);
+
+ cacheMatrix2.destroy();
+
+ assert cache.containsKeys(keySet1);
+ assert !cache.containsKeys(keySet2);
+
+ cacheMatrix1.destroy();
+
+ assert !cache.containsKeys(keySet1);
+ }
+
+ /** */
+ public void testLike() {
+ IgniteUtils.setCurrentIgniteName(ignite.configuration().getIgniteInstanceName());
+
+ cacheMatrix = new SparseBlockDistributedMatrix(rows, cols);
+
+ assertNotNull(cacheMatrix.like(1, 1));
+ }
+
+ /** */
+ public void testLikeVector() {
+ IgniteUtils.setCurrentIgniteName(ignite.configuration().getIgniteInstanceName());
+
+ cacheMatrix = new SparseBlockDistributedMatrix(rows, cols);
+
+ try {
+ cacheMatrix.likeVector(1);
+ fail("UnsupportedOperationException expected.");
+ }
+ catch (UnsupportedOperationException e) {
+ return;
+ }
+ fail("UnsupportedOperationException expected.");
+ }
+
+ /**
+ * Simple test for two square matrices.
+ */
+ public void testSquareMatrixTimes(){
+ IgniteUtils.setCurrentIgniteName(ignite.configuration().getIgniteInstanceName());
+
+ int size = 100;
+
+ Matrix cacheMatrix1 = new SparseBlockDistributedMatrix(size, size);
+ Matrix cacheMatrix2 = new SparseBlockDistributedMatrix(size, size);
+
+ for (int i = 0; i < size; i++) {
+ cacheMatrix1.setX(i, i, i);
+ cacheMatrix2.setX(i, i, i);
+ }
+
+ Matrix res = cacheMatrix1.times(cacheMatrix2);
+
+ for(int i = 0; i < size; i++)
+ for(int j = 0; j < size; j++)
+ if (i == j)
+ assertEquals(UNEXPECTED_VAL + " for "+ i +":"+ j, i * i, res.get(i, j), PRECISION);
+ else
+ assertEquals(UNEXPECTED_VAL + " for "+ i +":"+ j, 0, res.get(i, j), PRECISION);
+ }
+
+ /**
+ *
+ */
+ public void testNonSquareMatrixTimes(){
+ IgniteUtils.setCurrentIgniteName(ignite.configuration().getIgniteInstanceName());
+
+ int size = BlockEntry.MAX_BLOCK_SIZE + 1;
+ int size2 = BlockEntry.MAX_BLOCK_SIZE * 2 + 1;
+
+ Matrix cacheMatrix1 = new SparseBlockDistributedMatrix(size2, size);
+ Matrix cacheMatrix2 = new SparseBlockDistributedMatrix(size, size2);
+
+ for (int i = 0; i < size; i++) {
+ cacheMatrix1.setX(i, i, i);
+ cacheMatrix2.setX(i, i, i);
+ }
+
+ Matrix res = cacheMatrix1.times(cacheMatrix2);
+
+ for(int i = 0; i < size; i++)
+ for(int j = 0; j < size; j++)
+ if (i == j)
+ assertEquals(UNEXPECTED_VAL + " for "+ i +":"+ j, i * i, res.get(i, j), PRECISION);
+ else
+ assertEquals(UNEXPECTED_VAL + " for "+ i +":"+ j, 0, res.get(i, j), PRECISION);
+ }
+
+ /**
+ *
+ */
+ public void testNonSquareMatrixTimes2(){
+ IgniteUtils.setCurrentIgniteName(ignite.configuration().getIgniteInstanceName());
+
+ int size = BlockEntry.MAX_BLOCK_SIZE + 1;
+ int size2 = BlockEntry.MAX_BLOCK_SIZE * 2 + 1;
+
+ Matrix cacheMatrix1 = new SparseBlockDistributedMatrix(size, size2);
+ Matrix cacheMatrix2 = new SparseBlockDistributedMatrix(size2, size);
+
+ for (int i = 0; i < size; i++) {
+ cacheMatrix1.setX(i, i, i);
+ cacheMatrix2.setX(i, i, i);
+ }
+
+ Matrix res = cacheMatrix1.times(cacheMatrix2);
+
+ for(int i = 0; i < size; i++)
+ for(int j = 0; j < size; j++)
+ if (i == j)
+ assertEquals(UNEXPECTED_VAL + " for "+ i +":"+ j, i * i, res.get(i, j), PRECISION);
+ else
+ assertEquals(UNEXPECTED_VAL + " for "+ i +":"+ j, 0, res.get(i, j), PRECISION);
+ }
+
+ /** */
+ private void initMtx(Matrix m) {
+ for (int i = 0; i < m.rowSize(); i++)
+ for (int j = 0; j < m.columnSize(); j++)
+ m.set(i, j, 1.0);
+ }
+
+ /** Build key set for SparseBlockDistributedMatrix. */
+ private Set<BlockMatrixKey> buildKeySet(SparseBlockDistributedMatrix m){
+ Set<BlockMatrixKey> set = new HashSet<>();
+
+ BlockMatrixStorage storage = (BlockMatrixStorage)m.getStorage();
+
+ IgniteUuid uuid = storage.getUUID();
+
+ long maxBlock = (rows / 32 + (rows % 32 > 0 ? 1 : 0)) * (cols / 32 + (cols % 32 > 0 ? 1 : 0));
+
+ for (long i = 0; i < maxBlock; i++)
+ set.add(new BlockMatrixKey(i,uuid,null));
+
+ return set;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/0d2b989d/modules/ml/src/test/java/org/apache/ignite/ml/math/impls/matrix/SparseDistributedMatrixTest.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/test/java/org/apache/ignite/ml/math/impls/matrix/SparseDistributedMatrixTest.java b/modules/ml/src/test/java/org/apache/ignite/ml/math/impls/matrix/SparseDistributedMatrixTest.java
index a7cd6b5..3fec83c 100644
--- a/modules/ml/src/test/java/org/apache/ignite/ml/math/impls/matrix/SparseDistributedMatrixTest.java
+++ b/modules/ml/src/test/java/org/apache/ignite/ml/math/impls/matrix/SparseDistributedMatrixTest.java
@@ -48,10 +48,10 @@ import static org.apache.ignite.ml.math.impls.MathTestConstants.UNEXPECTED_VAL;
public class SparseDistributedMatrixTest extends GridCommonAbstractTest {
/** Number of nodes in grid */
private static final int NODE_COUNT = 3;
- /** Cache name. */
- private static final String CACHE_NAME = "test-cache";
/** Precision. */
private static final double PRECISION = 0.0;
+ /** */
+ private static final int MATRIX_SIZE = 10;
/** Grid instance. */
private Ignite ignite;
/** Matrix rows */
@@ -90,8 +90,6 @@ public class SparseDistributedMatrixTest extends GridCommonAbstractTest {
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
- ignite.destroyCache(CACHE_NAME);
-
if (cacheMatrix != null) {
cacheMatrix.destroy();
cacheMatrix = null;
@@ -166,7 +164,9 @@ public class SparseDistributedMatrixTest extends GridCommonAbstractTest {
assertEquals(UNEXPECTED_VAL, cacheMatrix.rowSize() * cacheMatrix.columnSize(), cacheMatrix.sum(), PRECISION);
}
- /** */
+ /**
+ * TODO: IGNITE-5102, wrong min/max, wait for fold/map fix
+ */
public void testMinMax() {
IgniteUtils.setCurrentIgniteName(ignite.configuration().getIgniteInstanceName());
@@ -286,6 +286,28 @@ public class SparseDistributedMatrixTest extends GridCommonAbstractTest {
}
/** */
+ public void testMatrixTimes(){
+ IgniteUtils.setCurrentIgniteName(ignite.configuration().getIgniteInstanceName());
+
+ SparseDistributedMatrix cacheMatrix1 = new SparseDistributedMatrix(MATRIX_SIZE, MATRIX_SIZE, StorageConstants.ROW_STORAGE_MODE, StorageConstants.RANDOM_ACCESS_MODE);
+ SparseDistributedMatrix cacheMatrix2 = new SparseDistributedMatrix(MATRIX_SIZE, MATRIX_SIZE, StorageConstants.ROW_STORAGE_MODE, StorageConstants.RANDOM_ACCESS_MODE);
+
+ for (int i = 0; i < MATRIX_SIZE; i++) {
+ cacheMatrix1.setX(i, i, i);
+ cacheMatrix2.setX(i, i, i);
+ }
+
+ Matrix res = cacheMatrix1.times(cacheMatrix2);
+
+ for(int i = 0; i < MATRIX_SIZE; i++)
+ for(int j = 0; j < MATRIX_SIZE; j++)
+ if (i == j)
+ assertEquals(UNEXPECTED_VAL, i * i, res.get(i, j), PRECISION);
+ else
+ assertEquals(UNEXPECTED_VAL, 0, res.get(i, j), PRECISION);
+ }
+
+ /** */
private void initMtx(Matrix m) {
for (int i = 0; i < m.rowSize(); i++)
for (int j = 0; j < m.columnSize(); j++)
[08/14] ignite git commit: Merge remote-tracking branch
'origin/master'
Posted by sb...@apache.org.
Merge remote-tracking branch 'origin/master'
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0adaf6e4
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0adaf6e4
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0adaf6e4
Branch: refs/heads/ignite-5569-debug
Commit: 0adaf6e4a04fa6405a5596e583e760871e1092dd
Parents: 48f2994 0d2b989
Author: Anton Vinogradov <av...@apache.org>
Authored: Fri Jul 21 15:31:28 2017 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Fri Jul 21 15:31:28 2017 +0300
----------------------------------------------------------------------
.../apache/ignite/ml/math/DistanceMeasure.java | 2 +-
.../ignite/ml/math/EuclideanDistance.java | 3 +-
.../math/decompositions/EigenDecomposition.java | 2 +-
.../apache/ignite/ml/math/impls/CacheUtils.java | 198 +++++++--
.../ml/math/impls/matrix/AbstractMatrix.java | 4 +-
.../ignite/ml/math/impls/matrix/BlockEntry.java | 50 +++
.../ml/math/impls/matrix/CacheMatrix.java | 9 +-
.../matrix/SparseBlockDistributedMatrix.java | 208 +++++++++
.../impls/matrix/SparseDistributedMatrix.java | 26 +-
.../storage/matrix/BaseBlockMatrixKey.java | 41 ++
.../impls/storage/matrix/BlockMatrixKey.java | 144 ++++++
.../storage/matrix/BlockMatrixStorage.java | 435 +++++++++++++++++++
.../vector/SparseLocalOnHeapVectorStorage.java | 4 +-
.../ignite/ml/math/statistics/Variance.java | 1 +
.../ignite/ml/math/statistics/package-info.java | 22 +
.../org/apache/ignite/ml/math/util/MapUtil.java | 2 +-
.../ignite/ml/math/util/package-info.java | 22 +
.../java/org/apache/ignite/ml/package-info.java | 22 +
.../ml/math/MathImplDistributedTestSuite.java | 2 +
.../SparseDistributedBlockMatrixTest.java | 379 ++++++++++++++++
.../matrix/SparseDistributedMatrixTest.java | 32 +-
21 files changed, 1528 insertions(+), 80 deletions(-)
----------------------------------------------------------------------