You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/05/04 11:32:57 UTC
[53/60] [abbrv] ignite git commit: IGNITE-5109 Refactoring for
SparseDistributedMatrix
IGNITE-5109 Refactoring for SparseDistributedMatrix
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/156ec536
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/156ec536
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/156ec536
Branch: refs/heads/ignite-5075-cacheStart
Commit: 156ec5360e6ec918878d9d0c6f7a5d04fc8161a0
Parents: c64ad78
Author: Yury Babak <yb...@gridgain.com>
Authored: Wed May 3 20:02:38 2017 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Wed May 3 20:02:38 2017 +0300
----------------------------------------------------------------------
.../apache/ignite/ml/math/impls/CacheUtils.java | 111 +++++++++++++------
.../ml/math/impls/matrix/AbstractMatrix.java | 5 +
.../impls/matrix/DenseLocalOffHeapMatrix.java | 5 -
.../impls/matrix/SparseDistributedMatrix.java | 16 ++-
.../matrix/SparseDistributedMatrixStorage.java | 89 ++++++++++-----
.../ignite/ml/math/MathImplMainTestSuite.java | 5 +-
.../matrix/SparseDistributedMatrixTest.java | 67 +++++++++--
7 files changed, 213 insertions(+), 85 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/156ec536/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/CacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/CacheUtils.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/CacheUtils.java
index cfb01be..ace399b 100644
--- a/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/CacheUtils.java
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/CacheUtils.java
@@ -28,13 +28,17 @@ import org.apache.ignite.cache.affinity.Affinity;
import org.apache.ignite.cache.query.ScanQuery;
import org.apache.ignite.cluster.ClusterGroup;
import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteRunnable;
+import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.ml.math.KeyMapper;
import org.apache.ignite.ml.math.ValueMapper;
import org.apache.ignite.ml.math.functions.IgniteBiFunction;
import org.apache.ignite.ml.math.functions.IgniteConsumer;
import org.apache.ignite.ml.math.functions.IgniteFunction;
+import org.apache.ignite.ml.math.impls.storage.matrix.SparseDistributedMatrixStorage;
/**
* Distribution-related misc. support.
@@ -118,17 +122,21 @@ public class CacheUtils {
}
/**
- * @param cacheName Cache name.
+ * @param matrixUuid Matrix UUID.
* @return Sum obtained using sparse logic.
*/
- public static <K, V> double sparseSum(String cacheName) {
- Collection<Double> subSums = fold(cacheName, (CacheEntry<Integer, Map<Integer, Double>> ce, Double acc) -> {
- Map<Integer, Double> map = ce.entry().getValue();
+ public static <K, V> double sparseSum(IgniteUuid matrixUuid) {
+ Collection<Double> subSums = fold(SparseDistributedMatrixStorage.ML_CACHE_NAME, (CacheEntry<IgniteBiTuple<Integer, IgniteUuid>, Map<Integer, Double>> ce, Double acc) -> {
+ Cache.Entry<IgniteBiTuple<Integer, IgniteUuid>, Map<Integer, Double>> entry = ce.entry();
+ if (entry.getKey().get2().equals(matrixUuid)) {
+ Map<Integer, Double> map = entry.getValue();
- double sum = sum(map.values());
+ double sum = sum(map.values());
- return acc == null ? sum : acc + sum;
- });
+ return acc == null ? sum : acc + sum;
+ } else
+ return acc;
+ }, key -> key.get2().equals(matrixUuid));
return sum(subSums);
}
@@ -172,39 +180,48 @@ public class CacheUtils {
}
/**
- * @param cacheName Cache name.
+ * @param matrixUuid Matrix UUID.
* @return Minimum value obtained using sparse logic.
*/
- public static <K, V> double sparseMin(String cacheName) {
- Collection<Double> mins = fold(cacheName, (CacheEntry<Integer, Map<Integer, Double>> ce, Double acc) -> {
- Map<Integer, Double> map = ce.entry().getValue();
+ public static <K, V> double sparseMin(IgniteUuid matrixUuid) {
+ Collection<Double> mins = fold(SparseDistributedMatrixStorage.ML_CACHE_NAME, (CacheEntry<IgniteBiTuple<Integer, IgniteUuid>, Map<Integer, Double>> ce, Double acc) -> {
+ Cache.Entry<IgniteBiTuple<Integer, IgniteUuid>, Map<Integer, Double>> entry = ce.entry();
- double min = Collections.min(map.values());
+ if (entry.getKey().get2().equals(matrixUuid)) {
+ Map<Integer, Double> map = entry.getValue();
- if (acc == null)
- return min;
- else
- return Math.min(acc, min);
- });
+ double min = Collections.min(map.values());
+
+ if (acc == null)
+ return min;
+ else
+ return Math.min(acc, min);
+ } else
+ return acc;
+ }, key -> key.get2().equals(matrixUuid));
return Collections.min(mins);
}
/**
- * @param cacheName Cache name.
+ * @param matrixUuid Matrix UUID.
* @return Maximum value obtained using sparse logic.
*/
- public static <K, V> double sparseMax(String cacheName) {
- Collection<Double> maxes = fold(cacheName, (CacheEntry<Integer, Map<Integer, Double>> ce, Double acc) -> {
- Map<Integer, Double> map = ce.entry().getValue();
+ public static <K, V> double sparseMax(IgniteUuid matrixUuid) {
+ Collection<Double> maxes = fold(SparseDistributedMatrixStorage.ML_CACHE_NAME, (CacheEntry<IgniteBiTuple<Integer, IgniteUuid>, Map<Integer, Double>> ce, Double acc) -> {
+ Cache.Entry<IgniteBiTuple<Integer, IgniteUuid>, Map<Integer, Double>> entry = ce.entry();
+ if (entry.getKey().get2().equals(matrixUuid)) {
+ Map<Integer, Double> map = entry.getValue();
- double max = Collections.max(map.values());
+ double max = Collections.max(map.values());
- if (acc == null)
- return max;
- else
- return Math.max(acc, max);
- });
+ if (acc == null)
+ return max;
+ else
+ return Math.max(acc, max);
+ } else
+ return acc;
+ }, key -> key.get2().equals(matrixUuid));
return Collections.max(maxes);
}
@@ -254,19 +271,20 @@ public class CacheUtils {
}
/**
- * @param cacheName Cache name.
+ * @param matrixUuid Matrix UUID.
* @param mapper Mapping {@link IgniteFunction}.
*/
- public static <K, V> void sparseMap(String cacheName, IgniteFunction<Double, Double> mapper) {
- foreach(cacheName, (CacheEntry<Integer, Map<Integer, Double>> ce) -> {
- Integer k = ce.entry().getKey();
+ public static <K, V> void sparseMap(IgniteUuid matrixUuid, IgniteFunction<Double, Double> mapper) {
+ foreach(SparseDistributedMatrixStorage.ML_CACHE_NAME, (CacheEntry<IgniteBiTuple<Integer, IgniteUuid>, Map<Integer, Double>> ce) -> {
+ IgniteBiTuple k = ce.entry().getKey();
+
Map<Integer, Double> v = ce.entry().getValue();
for (Map.Entry<Integer, Double> e : v.entrySet())
e.setValue(mapper.apply(e.getValue()));
ce.cache().put(k, v);
- });
+ }, key -> key.get2().equals(matrixUuid));
}
/**
@@ -276,6 +294,17 @@ public class CacheUtils {
* @param <V> Cache value object type.
*/
public static <K, V> void foreach(String cacheName, IgniteConsumer<CacheEntry<K, V>> fun) {
+ foreach(cacheName, fun, null);
+ }
+
+ /**
+ * @param cacheName Cache name.
+ * @param fun An operation that accepts a cache entry and processes it.
+ * @param keyFilter Cache keys filter.
+ * @param <K> Cache key object type.
+ * @param <V> Cache value object type.
+ */
+ public static <K, V> void foreach(String cacheName, IgniteConsumer<CacheEntry<K, V>> fun, IgnitePredicate<K> keyFilter) {
bcast(cacheName, () -> {
Ignite ignite = Ignition.localIgnite();
IgniteCache<K, V> cache = ignite.getOrCreateCache(cacheName);
@@ -293,7 +322,7 @@ public class CacheUtils {
// Iterate over given partition.
// Query returns an empty cursor if this partition is not stored on this node.
for (Cache.Entry<K, V> entry : cache.query(new ScanQuery<K, V>(part,
- (k, v) -> affinity.mapPartitionToNode(p) == locNode)))
+ (k, v) -> affinity.mapPartitionToNode(p) == locNode && (keyFilter == null || keyFilter.apply(k)))))
fun.accept(new CacheEntry<>(entry, cache));
}
});
@@ -310,6 +339,20 @@ public class CacheUtils {
* @return Fold operation result.
*/
public static <K, V, A> Collection<A> fold(String cacheName, IgniteBiFunction<CacheEntry<K, V>, A, A> folder) {
+ return fold(cacheName, folder, null);
+ }
+
+ /**
+ * <b>Currently fold supports only commutative operations.<b/>
+ *
+ * @param cacheName Cache name.
+ * @param folder Fold function operating over cache entries.
+ * @param <K> Cache key object type.
+ * @param <V> Cache value object type.
+ * @param <A> Fold result type.
+ * @return Fold operation result.
+ */
+ public static <K, V, A> Collection<A> fold(String cacheName, IgniteBiFunction<CacheEntry<K, V>, A, A> folder, IgnitePredicate<K> keyFilter) {
return bcast(cacheName, () -> {
Ignite ignite = Ignition.localIgnite();
IgniteCache<K, V> cache = ignite.getOrCreateCache(cacheName);
@@ -329,7 +372,7 @@ public class CacheUtils {
// Iterate over given partition.
// Query returns an empty cursor if this partition is not stored on this node.
for (Cache.Entry<K, V> entry : cache.query(new ScanQuery<K, V>(part,
- (k, v) -> affinity.mapPartitionToNode(p) == locNode)))
+ (k, v) -> affinity.mapPartitionToNode(p) == locNode && (keyFilter == null || keyFilter.apply(k)))))
a = folder.apply(new CacheEntry<>(entry, cache), a);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/156ec536/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/AbstractMatrix.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/AbstractMatrix.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/AbstractMatrix.java
index c5edeb1..d1d3904 100644
--- a/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/AbstractMatrix.java
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/AbstractMatrix.java
@@ -840,6 +840,11 @@ public abstract class AbstractMatrix implements Matrix {
}
/** {@inheritDoc} */
+ @Override public void destroy() {
+ getStorage().destroy();
+ }
+
+ /** {@inheritDoc} */
@Override public Matrix copy() {
Matrix cp = like(rowSize(), columnSize());
http://git-wip-us.apache.org/repos/asf/ignite/blob/156ec536/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/DenseLocalOffHeapMatrix.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/DenseLocalOffHeapMatrix.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/DenseLocalOffHeapMatrix.java
index 4161228..fad35fd 100644
--- a/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/DenseLocalOffHeapMatrix.java
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/DenseLocalOffHeapMatrix.java
@@ -71,11 +71,6 @@ public class DenseLocalOffHeapMatrix extends AbstractMatrix {
}
/** {@inheritDoc} */
- @Override public void destroy() {
- getStorage().destroy();
- }
-
- /** {@inheritDoc} */
@Override protected Matrix likeIdentity() {
int n = rowSize();
Matrix res = like(n, n);
http://git-wip-us.apache.org/repos/asf/ignite/blob/156ec536/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/SparseDistributedMatrix.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/SparseDistributedMatrix.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/SparseDistributedMatrix.java
index 10ebdd0..3e508bd 100644
--- a/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/SparseDistributedMatrix.java
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/SparseDistributedMatrix.java
@@ -26,6 +26,7 @@
package org.apache.ignite.ml.math.impls.matrix;
+import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.ml.math.Matrix;
import org.apache.ignite.ml.math.StorageConstants;
import org.apache.ignite.ml.math.Vector;
@@ -119,24 +120,24 @@ public class SparseDistributedMatrix extends AbstractMatrix implements StorageCo
* @return Matrix with mapped values.
*/
private Matrix mapOverValues(IgniteFunction<Double, Double> mapper) {
- CacheUtils.sparseMap(storage().cache().getName(), mapper);
+ CacheUtils.sparseMap(getUUID(), mapper);
return this;
}
/** {@inheritDoc} */
@Override public double sum() {
- return CacheUtils.sparseSum(storage().cache().getName());
+ return CacheUtils.sparseSum(getUUID());
}
/** {@inheritDoc} */
@Override public double maxValue() {
- return CacheUtils.sparseMax(storage().cache().getName());
+ return CacheUtils.sparseMax(getUUID());
}
/** {@inheritDoc} */
@Override public double minValue() {
- return CacheUtils.sparseMin(storage().cache().getName());
+ return CacheUtils.sparseMin(getUUID());
}
/** {@inheritDoc} */
@@ -146,11 +147,16 @@ public class SparseDistributedMatrix extends AbstractMatrix implements StorageCo
/** {@inheritDoc} */
@Override public Matrix like(int rows, int cols) {
- throw new UnsupportedOperationException();
+ return new SparseDistributedMatrix(rows, cols, storage().storageMode(), storage().accessMode());
}
/** {@inheritDoc} */
@Override public Vector likeVector(int crd) {
throw new UnsupportedOperationException();
}
+
+ /** */
+ private IgniteUuid getUUID(){
+ return ((SparseDistributedMatrixStorage) getStorage()).getUUID();
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/156ec536/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/storage/matrix/SparseDistributedMatrixStorage.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/storage/matrix/SparseDistributedMatrixStorage.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/storage/matrix/SparseDistributedMatrixStorage.java
index bfc0e9f..816bf44 100644
--- a/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/storage/matrix/SparseDistributedMatrixStorage.java
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/storage/matrix/SparseDistributedMatrixStorage.java
@@ -23,6 +23,9 @@ import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.Ignition;
import org.apache.ignite.cache.CacheAtomicityMode;
@@ -30,6 +33,7 @@ import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.ml.math.MatrixStorage;
import org.apache.ignite.ml.math.StorageConstants;
@@ -40,19 +44,22 @@ import org.apache.ignite.ml.math.impls.matrix.SparseDistributedMatrix;
* {@link MatrixStorage} implementation for {@link SparseDistributedMatrix}.
*/
public class SparseDistributedMatrixStorage extends CacheUtils implements MatrixStorage, StorageConstants {
+ /** Cache name used for all instances of {@link SparseDistributedMatrixStorage}.*/
+ public static final String ML_CACHE_NAME = "ML_SPARSE_MATRICES_CONTAINER";
/** Amount of rows in the matrix. */
private int rows;
/** Amount of columns in the matrix. */
private int cols;
-
/** Row or column based storage mode. */
private int stoMode;
/** Random or sequential access mode. */
private int acsMode;
+ /** Matrix uuid. */
+ private IgniteUuid uuid;
/** Actual distributed storage. */
private IgniteCache<
- Integer /* Row or column index. */,
+ IgniteBiTuple<Integer, IgniteUuid> /* Row or column index with matrix uuid. */,
Map<Integer, Double> /* Map-based row or column. */
> cache = null;
@@ -81,14 +88,15 @@ public class SparseDistributedMatrixStorage extends CacheUtils implements Matrix
this.acsMode = acsMode;
cache = newCache();
+
+ uuid = IgniteUuid.randomUuid();
}
/**
- *
- *
+ * Create new ML cache if needed.
*/
- private IgniteCache<Integer, Map<Integer, Double>> newCache() {
- CacheConfiguration<Integer, Map<Integer, Double>> cfg = new CacheConfiguration<>();
+ private IgniteCache<IgniteBiTuple<Integer, IgniteUuid>, Map<Integer, Double>> newCache() {
+ CacheConfiguration<IgniteBiTuple<Integer, IgniteUuid>, Map<Integer, Double>> cfg = new CacheConfiguration<>();
// Write to primary.
cfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.PRIMARY_SYNC);
@@ -106,16 +114,18 @@ public class SparseDistributedMatrixStorage extends CacheUtils implements Matrix
cfg.setCacheMode(CacheMode.PARTITIONED);
// Random cache name.
- cfg.setName(new IgniteUuid().shortString());
+ cfg.setName(ML_CACHE_NAME);
- return Ignition.localIgnite().getOrCreateCache(cfg);
+ IgniteCache<IgniteBiTuple<Integer, IgniteUuid>, Map<Integer, Double>> cache = Ignition.localIgnite().getOrCreateCache(cfg);
+
+ return cache;
}
/**
*
*
*/
- public IgniteCache<Integer, Map<Integer, Double>> cache() {
+ public IgniteCache<IgniteBiTuple<Integer, IgniteUuid>, Map<Integer, Double>> cache() {
return cache;
}
@@ -138,34 +148,36 @@ public class SparseDistributedMatrixStorage extends CacheUtils implements Matrix
/** {@inheritDoc} */
@Override public double get(int x, int y) {
if (stoMode == ROW_STORAGE_MODE)
- return matrixGet(cache.getName(), x, y);
+ return matrixGet(x, y);
else
- return matrixGet(cache.getName(), y, x);
+ return matrixGet(y, x);
}
/** {@inheritDoc} */
@Override public void set(int x, int y, double v) {
if (stoMode == ROW_STORAGE_MODE)
- matrixSet(cache.getName(), x, y, v);
+ matrixSet(x, y, v);
else
- matrixSet(cache.getName(), y, x, v);
+ matrixSet(y, x, v);
}
/**
* Distributed matrix get.
*
- * @param cacheName Matrix's cache.
* @param a Row or column index.
* @param b Row or column index.
* @return Matrix value at (a, b) index.
*/
- private double matrixGet(String cacheName, int a, int b) {
+ private double matrixGet(int a, int b) {
// Remote get from the primary node (where given row or column is stored locally).
- return ignite().compute(groupForKey(cacheName, a)).call(() -> {
- IgniteCache<Integer, Map<Integer, Double>> cache = Ignition.localIgnite().getOrCreateCache(cacheName);
+ return ignite().compute(groupForKey(ML_CACHE_NAME, a)).call(() -> {
+ IgniteCache<IgniteBiTuple<Integer, IgniteUuid>, Map<Integer, Double>> cache = Ignition.localIgnite().getOrCreateCache(ML_CACHE_NAME);
// Local get.
- Map<Integer, Double> map = cache.localPeek(a, CachePeekMode.PRIMARY);
+ Map<Integer, Double> map = cache.localPeek(getCacheKey(a), CachePeekMode.PRIMARY);
+
+ if (map == null)
+ map = cache.get(getCacheKey(a));
return (map == null || !map.containsKey(b)) ? 0.0 : map.get(b);
});
@@ -174,21 +186,25 @@ public class SparseDistributedMatrixStorage extends CacheUtils implements Matrix
/**
* Distributed matrix set.
*
- * @param cacheName Matrix's cache.
* @param a Row or column index.
* @param b Row or column index.
* @param v New value to set.
*/
- private void matrixSet(String cacheName, int a, int b, double v) {
+ private void matrixSet(int a, int b, double v) {
// Remote set on the primary node (where given row or column is stored locally).
- ignite().compute(groupForKey(cacheName, a)).run(() -> {
- IgniteCache<Integer, Map<Integer, Double>> cache = Ignition.localIgnite().getOrCreateCache(cacheName);
+ ignite().compute(groupForKey(ML_CACHE_NAME, a)).run(() -> {
+ IgniteCache<IgniteBiTuple<Integer, IgniteUuid>, Map<Integer, Double>> cache = Ignition.localIgnite().getOrCreateCache(ML_CACHE_NAME);
// Local get.
- Map<Integer, Double> map = cache.localPeek(a, CachePeekMode.PRIMARY);
+ Map<Integer, Double> map = cache.localPeek(getCacheKey(a), CachePeekMode.PRIMARY);
- if (map == null)
- map = acsMode == SEQUENTIAL_ACCESS_MODE ? new Int2DoubleRBTreeMap() : new Int2DoubleOpenHashMap();
+
+ if (map == null) {
+ map = cache.get(getCacheKey(a)); //Remote entry get.
+
+ if (map == null)
+ map = acsMode == SEQUENTIAL_ACCESS_MODE ? new Int2DoubleRBTreeMap() : new Int2DoubleOpenHashMap();
+ }
if (v != 0.0)
map.put(b, v);
@@ -196,10 +212,15 @@ public class SparseDistributedMatrixStorage extends CacheUtils implements Matrix
map.remove(b);
// Local put.
- cache.put(a, map);
+ cache.put(getCacheKey(a), map);
});
}
+ /** Build cache key for row/column. */
+ private IgniteBiTuple<Integer, IgniteUuid> getCacheKey(int idx){
+ return new IgniteBiTuple<>(idx, uuid);
+ }
+
/** {@inheritDoc} */
@Override public int columnSize() {
return cols;
@@ -216,6 +237,7 @@ public class SparseDistributedMatrixStorage extends CacheUtils implements Matrix
out.writeInt(cols);
out.writeInt(acsMode);
out.writeInt(stoMode);
+ out.writeObject(uuid);
out.writeUTF(cache.getName());
}
@@ -225,6 +247,7 @@ public class SparseDistributedMatrixStorage extends CacheUtils implements Matrix
cols = in.readInt();
acsMode = in.readInt();
stoMode = in.readInt();
+ uuid = (IgniteUuid)in.readObject();
cache = ignite().getOrCreateCache(in.readUTF());
}
@@ -253,9 +276,11 @@ public class SparseDistributedMatrixStorage extends CacheUtils implements Matrix
return false;
}
- /** Destroy underlying cache. */
+ /** Delete all data from cache. */
@Override public void destroy() {
- cache.destroy();
+ Set<IgniteBiTuple<Integer, IgniteUuid>> keyset = IntStream.range(0, rows).mapToObj(this::getCacheKey).collect(Collectors.toSet());
+
+ cache.clearAll(keyset);
}
/** {@inheritDoc} */
@@ -266,6 +291,7 @@ public class SparseDistributedMatrixStorage extends CacheUtils implements Matrix
res = res * 37 + rows;
res = res * 37 + acsMode;
res = res * 37 + stoMode;
+ res = res * 37 + uuid.hashCode();
res = res * 37 + cache.hashCode();
return res;
@@ -282,6 +308,11 @@ public class SparseDistributedMatrixStorage extends CacheUtils implements Matrix
SparseDistributedMatrixStorage that = (SparseDistributedMatrixStorage)obj;
return rows == that.rows && cols == that.cols && acsMode == that.acsMode && stoMode == that.stoMode
- && (cache != null ? cache.equals(that.cache) : that.cache == null);
+ && uuid.equals(that.uuid) && (cache != null ? cache.equals(that.cache) : that.cache == null);
+ }
+
+ /** */
+ public IgniteUuid getUUID() {
+ return uuid;
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/156ec536/modules/ml/src/test/java/org/apache/ignite/ml/math/MathImplMainTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/test/java/org/apache/ignite/ml/math/MathImplMainTestSuite.java b/modules/ml/src/test/java/org/apache/ignite/ml/math/MathImplMainTestSuite.java
index 5f41583..8d6d2af 100644
--- a/modules/ml/src/test/java/org/apache/ignite/ml/math/MathImplMainTestSuite.java
+++ b/modules/ml/src/test/java/org/apache/ignite/ml/math/MathImplMainTestSuite.java
@@ -21,12 +21,13 @@ import org.junit.runner.RunWith;
import org.junit.runners.Suite;
/**
- * Test suite for local and distributed tests
+ * Test suite for local and distributed tests.
*/
@RunWith(Suite.class)
@Suite.SuiteClasses({
MathImplLocalTestSuite.class,
- MathImplDistributedTestSuite.class
+ MathImplDistributedTestSuite.class,
+ TracerTest.class
})
public class MathImplMainTestSuite {
// No-op.
http://git-wip-us.apache.org/repos/asf/ignite/blob/156ec536/modules/ml/src/test/java/org/apache/ignite/ml/math/impls/matrix/SparseDistributedMatrixTest.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/test/java/org/apache/ignite/ml/math/impls/matrix/SparseDistributedMatrixTest.java b/modules/ml/src/test/java/org/apache/ignite/ml/math/impls/matrix/SparseDistributedMatrixTest.java
index 8985806..5ee2e7d 100644
--- a/modules/ml/src/test/java/org/apache/ignite/ml/math/impls/matrix/SparseDistributedMatrixTest.java
+++ b/modules/ml/src/test/java/org/apache/ignite/ml/math/impls/matrix/SparseDistributedMatrixTest.java
@@ -31,12 +31,19 @@ import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.ml.math.Matrix;
import org.apache.ignite.ml.math.StorageConstants;
import org.apache.ignite.ml.math.exceptions.UnsupportedOperationException;
import org.apache.ignite.ml.math.impls.MathTestConstants;
+import org.apache.ignite.ml.math.impls.storage.matrix.SparseDistributedMatrixStorage;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.testframework.junits.common.GridCommonTest;
@@ -110,7 +117,7 @@ public class SparseDistributedMatrixTest extends GridCommonAbstractTest {
double v = Math.random();
cacheMatrix.set(i, j, v);
- assert Double.compare(v, cacheMatrix.get(i, j)) == 0;
+ assertEquals("Unexpected value for matrix element["+ i +" " + j + "]", v, cacheMatrix.get(i, j), PRECISION);
}
}
}
@@ -134,7 +141,7 @@ public class SparseDistributedMatrixTest extends GridCommonAbstractTest {
SparseDistributedMatrix objRestored = (SparseDistributedMatrix)objInputStream.readObject();
assertTrue(MathTestConstants.VAL_NOT_EQUALS, cacheMatrix.equals(objRestored));
- assertEquals(MathTestConstants.VAL_NOT_EQUALS, objRestored.get(1, 1), 1.0, 0.0);
+ assertEquals(MathTestConstants.VAL_NOT_EQUALS, objRestored.get(1, 1), 1.0, PRECISION);
}
/** Test simple math. */
@@ -225,19 +232,44 @@ public class SparseDistributedMatrixTest extends GridCommonAbstractTest {
}
/** */
+ public void testCacheBehaviour(){
+ IgniteUtils.setCurrentIgniteName(ignite.configuration().getIgniteInstanceName());
+
+ SparseDistributedMatrix cacheMatrix1 = new SparseDistributedMatrix(rows, cols, StorageConstants.ROW_STORAGE_MODE, StorageConstants.RANDOM_ACCESS_MODE);
+ SparseDistributedMatrix cacheMatrix2 = new SparseDistributedMatrix(rows, cols, StorageConstants.ROW_STORAGE_MODE, StorageConstants.RANDOM_ACCESS_MODE);
+
+ initMtx(cacheMatrix1);
+ initMtx(cacheMatrix2);
+
+ Collection<String> cacheNames = ignite.cacheNames();
+
+ assert cacheNames.contains(SparseDistributedMatrixStorage.ML_CACHE_NAME);
+
+ IgniteCache<IgniteBiTuple<Integer, IgniteUuid>, Object> cache = ignite.getOrCreateCache(SparseDistributedMatrixStorage.ML_CACHE_NAME);
+
+ Set<IgniteBiTuple<Integer, IgniteUuid>> keySet1 = buildKeySet(cacheMatrix1);
+ Set<IgniteBiTuple<Integer, IgniteUuid>> keySet2 = buildKeySet(cacheMatrix2);
+
+ assert cache.containsKeys(keySet1);
+ assert cache.containsKeys(keySet2);
+
+ cacheMatrix2.destroy();
+
+ assert cache.containsKeys(keySet1);
+ assert !cache.containsKeys(keySet2);
+
+ cacheMatrix1.destroy();
+
+ assert !cache.containsKeys(keySet1);
+ }
+
+ /** */
public void testLike() {
IgniteUtils.setCurrentIgniteName(ignite.configuration().getIgniteInstanceName());
cacheMatrix = new SparseDistributedMatrix(rows, cols, StorageConstants.ROW_STORAGE_MODE, StorageConstants.RANDOM_ACCESS_MODE);
- try {
- cacheMatrix.like(1, 1);
- fail("UnsupportedOperationException expected.");
- }
- catch (UnsupportedOperationException e) {
- return;
- }
- fail("UnsupportedOperationException expected.");
+ assertNotNull(cacheMatrix.like(1, 1));
}
/** */
@@ -262,4 +294,19 @@ public class SparseDistributedMatrixTest extends GridCommonAbstractTest {
for (int j = 0; j < m.columnSize(); j++)
m.set(i, j, 1.0);
}
+
+ /** Build key set for SparseDistributedMatrix. */
+ private Set<IgniteBiTuple<Integer, IgniteUuid>> buildKeySet(SparseDistributedMatrix m){
+ Set<IgniteBiTuple<Integer, IgniteUuid>> set = new HashSet<>();
+
+ SparseDistributedMatrixStorage storage = (SparseDistributedMatrixStorage)m.getStorage();
+
+ IgniteUuid uuid = storage.getUUID();
+ int size = storage.storageMode() == StorageConstants.ROW_STORAGE_MODE ? storage.rowSize() : storage.columnSize();
+
+ for (int i = 0; i < size; i++)
+ set.add(new IgniteBiTuple<>(i, uuid));
+
+ return set;
+ }
}