You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by nt...@apache.org on 2017/11/17 13:07:59 UTC
[3/3] ignite git commit: IGNITE-5846 Add support of distributed
matrices for OLS regression. This closes #3030.
IGNITE-5846 Add support of distributed matrices for OLS regression. This closes #3030.
Signed-off-by: nikolay_tikhonov <nt...@gridgain.com>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b0a86018
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b0a86018
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b0a86018
Branch: refs/heads/master
Commit: b0a86018693581065f789635facb88b1e8dac834
Parents: cbd7e39
Author: YuriBabak <y....@gmail.com>
Authored: Fri Nov 17 16:06:34 2017 +0300
Committer: nikolay_tikhonov <nt...@gridgain.com>
Committed: Fri Nov 17 16:07:51 2017 +0300
----------------------------------------------------------------------
.../clustering/KMeansDistributedClusterer.java | 13 +-
.../ignite/ml/math/distributed/CacheUtils.java | 278 +++---
.../math/distributed/keys/BlockMatrixKey.java | 38 -
.../distributed/keys/DataStructureCacheKey.java | 35 +
.../math/distributed/keys/MatrixBlockKey.java | 38 +
.../math/distributed/keys/MatrixCacheKey.java | 35 -
.../math/distributed/keys/RowColMatrixKey.java | 2 +-
.../math/distributed/keys/VectorBlockKey.java | 34 +
.../distributed/keys/impl/BlockMatrixKey.java | 164 ----
.../distributed/keys/impl/MatrixBlockKey.java | 162 ++++
.../distributed/keys/impl/SparseMatrixKey.java | 14 +-
.../distributed/keys/impl/VectorBlockKey.java | 151 +++
.../ignite/ml/math/functions/Functions.java | 3 +-
.../ml/math/impls/matrix/AbstractMatrix.java | 24 +-
.../ignite/ml/math/impls/matrix/BlockEntry.java | 50 -
.../ml/math/impls/matrix/MatrixBlockEntry.java | 50 +
.../matrix/SparseBlockDistributedMatrix.java | 153 ++-
.../impls/matrix/SparseDistributedMatrix.java | 102 +-
.../storage/matrix/BlockMatrixStorage.java | 96 +-
.../storage/matrix/BlockVectorStorage.java | 374 ++++++++
.../impls/storage/matrix/MapWrapperStorage.java | 5 +-
.../matrix/SparseDistributedMatrixStorage.java | 21 +-
.../vector/SparseDistributedVectorStorage.java | 280 ++++++
.../vector/SparseBlockDistributedVector.java | 139 +++
.../impls/vector/SparseDistributedVector.java | 157 ++++
.../ml/math/impls/vector/VectorBlockEntry.java | 49 +
.../apache/ignite/ml/math/util/MatrixUtil.java | 4 +-
.../ml/math/MathImplDistributedTestSuite.java | 16 +-
.../SparseDistributedBlockMatrixTest.java | 86 +-
.../matrix/SparseDistributedMatrixTest.java | 27 +-
.../SparseDistributedVectorStorageTest.java | 121 +++
.../SparseBlockDistributedVectorTest.java | 181 ++++
.../vector/SparseDistributedVectorTest.java | 192 ++++
...tedBlockOLSMultipleLinearRegressionTest.java | 926 ++++++++++++++++++
...tributedOLSMultipleLinearRegressionTest.java | 934 +++++++++++++++++++
.../OLSMultipleLinearRegressionTest.java | 1 +
.../ml/regressions/RegressionsTestSuite.java | 2 +-
37 files changed, 4351 insertions(+), 606 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/b0a86018/modules/ml/src/main/java/org/apache/ignite/ml/clustering/KMeansDistributedClusterer.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/clustering/KMeansDistributedClusterer.java b/modules/ml/src/main/java/org/apache/ignite/ml/clustering/KMeansDistributedClusterer.java
index 6c25edc..4286f42 100644
--- a/modules/ml/src/main/java/org/apache/ignite/ml/clustering/KMeansDistributedClusterer.java
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/clustering/KMeansDistributedClusterer.java
@@ -196,12 +196,11 @@ public class KMeansDistributedClusterer extends BaseKMeansClusterer<SparseDistri
return list;
},
- key -> key.matrixId().equals(uid),
+ key -> key.dataStructureId().equals(uid),
(list1, list2) -> {
list1.addAll(list2);
return list1;
- },
- ArrayList::new
+ }, ArrayList::new
);
}
@@ -216,7 +215,7 @@ public class KMeansDistributedClusterer extends BaseKMeansClusterer<SparseDistri
return map;
},
- key -> key.matrixId().equals(points.getUUID()),
+ key -> key.dataStructureId().equals(points.getUUID()),
(map1, map2) -> {
map1.putAll(map2);
return map1;
@@ -247,10 +246,10 @@ public class KMeansDistributedClusterer extends BaseKMeansClusterer<SparseDistri
countMap.compute(resInd, (ind, v) -> v != null ? v + 1 : 1);
return countMap;
},
- key -> key.matrixId().equals(uid),
+ key -> key.dataStructureId().equals(uid),
(map1, map2) -> MapUtil.mergeMaps(map1, map2, (integer, integer2) -> integer2 + integer,
ConcurrentHashMap::new),
- ConcurrentHashMap::new);
+ ConcurrentHashMap::new);
}
/** */
@@ -278,7 +277,7 @@ public class KMeansDistributedClusterer extends BaseKMeansClusterer<SparseDistri
return counts;
},
- key -> key.matrixId().equals(uid),
+ key -> key.dataStructureId().equals(uid),
SumsAndCounts::merge, SumsAndCounts::new
);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b0a86018/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/CacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/CacheUtils.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/CacheUtils.java
index b9eb386..37384b8 100644
--- a/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/CacheUtils.java
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/CacheUtils.java
@@ -17,16 +17,6 @@
package org.apache.ignite.ml.math.distributed;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.function.BinaryOperator;
-import java.util.stream.Stream;
-import javax.cache.Cache;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.Ignition;
@@ -40,18 +30,21 @@ import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteCallable;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteRunnable;
-import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.ml.math.KeyMapper;
-import org.apache.ignite.ml.math.distributed.keys.BlockMatrixKey;
-import org.apache.ignite.ml.math.distributed.keys.MatrixCacheKey;
-import org.apache.ignite.ml.math.functions.IgniteBiFunction;
-import org.apache.ignite.ml.math.functions.IgniteBinaryOperator;
-import org.apache.ignite.ml.math.functions.IgniteConsumer;
-import org.apache.ignite.ml.math.functions.IgniteDoubleFunction;
-import org.apache.ignite.ml.math.functions.IgniteFunction;
-import org.apache.ignite.ml.math.functions.IgniteSupplier;
-import org.apache.ignite.ml.math.functions.IgniteTriFunction;
-import org.apache.ignite.ml.math.impls.matrix.BlockEntry;
+import org.apache.ignite.ml.math.distributed.keys.DataStructureCacheKey;
+import org.apache.ignite.ml.math.distributed.keys.RowColMatrixKey;
+import org.apache.ignite.ml.math.distributed.keys.impl.MatrixBlockKey;
+import org.apache.ignite.ml.math.distributed.keys.impl.VectorBlockKey;
+import org.apache.ignite.ml.math.exceptions.UnsupportedOperationException;
+import org.apache.ignite.ml.math.functions.*;
+import org.apache.ignite.ml.math.impls.matrix.MatrixBlockEntry;
+import org.apache.ignite.ml.math.impls.vector.VectorBlockEntry;
+
+import javax.cache.Cache;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.BinaryOperator;
+import java.util.stream.Stream;
/**
* Distribution-related misc. support.
@@ -104,11 +97,11 @@ public class CacheUtils {
/**
* @param cacheName Cache name.
- * @param k Key into the cache.
- * @param <K> Key type.
+ * @param k Key into the cache.
+ * @param <K> Key type.
* @return Cluster group for given key.
*/
- public static <K> ClusterGroup groupForKey(String cacheName, K k) {
+ protected static <K> ClusterGroup getClusterGroupForGivenKey(String cacheName, K k) {
return ignite().cluster().forNode(ignite().affinity(cacheName).mapKeyToNode(k));
}
@@ -116,8 +109,8 @@ public class CacheUtils {
* @param cacheName Cache name.
* @param keyMapper {@link KeyMapper} to validate cache key.
* @param valMapper {@link ValueMapper} to obtain double value for given cache key.
- * @param <K> Cache key object type.
- * @param <V> Cache value object type.
+ * @param <K> Cache key object type.
+ * @param <V> Cache value object type.
* @return Sum of the values obtained for valid keys.
*/
public static <K, V> double sum(String cacheName, KeyMapper<K> keyMapper, ValueMapper<V> valMapper) {
@@ -126,8 +119,7 @@ public class CacheUtils {
double v = valMapper.toDouble(ce.entry().getValue());
return acc == null ? v : acc + v;
- }
- else
+ } else
return acc;
});
@@ -146,19 +138,17 @@ public class CacheUtils {
Collection<Double> subSums = fold(cacheName, (CacheEntry<K, V> ce, Double acc) -> {
V v = ce.entry().getValue();
- double sum = 0.0;
+ double sum;
if (v instanceof Map) {
- Map<Integer, Double> map = (Map<Integer, Double>)v;
+ Map<Integer, Double> map = (Map<Integer, Double>) v;
sum = sum(map.values());
- }
- else if (v instanceof BlockEntry) {
- BlockEntry be = (BlockEntry)v;
+ } else if (v instanceof MatrixBlockEntry) {
+ MatrixBlockEntry be = (MatrixBlockEntry) v;
sum = be.sum();
- }
- else
+ } else
throw new UnsupportedOperationException();
return acc == null ? sum : acc + sum;
@@ -180,8 +170,8 @@ public class CacheUtils {
* @param cacheName Cache name.
* @param keyMapper {@link KeyMapper} to validate cache key.
* @param valMapper {@link ValueMapper} to obtain double value for given cache key.
- * @param <K> Cache key object type.
- * @param <V> Cache value object type.
+ * @param <K> Cache key object type.
+ * @param <V> Cache value object type.
* @return Minimum value for valid keys.
*/
public static <K, V> double min(String cacheName, KeyMapper<K> keyMapper, ValueMapper<V> valMapper) {
@@ -193,8 +183,7 @@ public class CacheUtils {
return v;
else
return Math.min(acc, v);
- }
- else
+ } else
return acc;
});
@@ -216,16 +205,14 @@ public class CacheUtils {
double min;
if (v instanceof Map) {
- Map<Integer, Double> map = (Map<Integer, Double>)v;
+ Map<Integer, Double> map = (Map<Integer, Double>) v;
min = Collections.min(map.values());
- }
- else if (v instanceof BlockEntry) {
- BlockEntry be = (BlockEntry)v;
+ } else if (v instanceof MatrixBlockEntry) {
+ MatrixBlockEntry be = (MatrixBlockEntry) v;
min = be.minValue();
- }
- else
+ } else
throw new UnsupportedOperationException();
if (acc == null)
@@ -253,16 +240,14 @@ public class CacheUtils {
double max;
if (v instanceof Map) {
- Map<Integer, Double> map = (Map<Integer, Double>)v;
+ Map<Integer, Double> map = (Map<Integer, Double>) v;
max = Collections.max(map.values());
- }
- else if (v instanceof BlockEntry) {
- BlockEntry be = (BlockEntry)v;
+ } else if (v instanceof MatrixBlockEntry) {
+ MatrixBlockEntry be = (MatrixBlockEntry) v;
max = be.maxValue();
- }
- else
+ } else
throw new UnsupportedOperationException();
if (acc == null)
@@ -279,8 +264,8 @@ public class CacheUtils {
* @param cacheName Cache name.
* @param keyMapper {@link KeyMapper} to validate cache key.
* @param valMapper {@link ValueMapper} to obtain double value for given cache key.
- * @param <K> Cache key object type.
- * @param <V> Cache value object type.
+ * @param <K> Cache key object type.
+ * @param <V> Cache value object type.
* @return Maximum value for valid keys.
*/
public static <K, V> double max(String cacheName, KeyMapper<K> keyMapper, ValueMapper<V> valMapper) {
@@ -292,8 +277,7 @@ public class CacheUtils {
return v;
else
return Math.max(acc, v);
- }
- else
+ } else
return acc;
});
@@ -304,12 +288,12 @@ public class CacheUtils {
* @param cacheName Cache name.
* @param keyMapper {@link KeyMapper} to validate cache key.
* @param valMapper {@link ValueMapper} to obtain double value for given cache key.
- * @param mapper Mapping {@link IgniteFunction}.
- * @param <K> Cache key object type.
- * @param <V> Cache value object type.
+ * @param mapper Mapping {@link IgniteFunction}.
+ * @param <K> Cache key object type.
+ * @param <V> Cache value object type.
*/
public static <K, V> void map(String cacheName, KeyMapper<K> keyMapper, ValueMapper<V> valMapper,
- IgniteFunction<Double, Double> mapper) {
+ IgniteFunction<Double, Double> mapper) {
foreach(cacheName, (CacheEntry<K, V> ce) -> {
K k = ce.entry().getKey();
@@ -321,7 +305,7 @@ public class CacheUtils {
/**
* @param matrixUuid Matrix UUID.
- * @param mapper Mapping {@link IgniteFunction}.
+ * @param mapper Mapping {@link IgniteFunction}.
*/
@SuppressWarnings("unchecked")
public static <K, V> void sparseMap(UUID matrixUuid, IgniteDoubleFunction<Double> mapper, String cacheName) {
@@ -335,18 +319,16 @@ public class CacheUtils {
V v = ce.entry().getValue();
if (v instanceof Map) {
- Map<Integer, Double> map = (Map<Integer, Double>)v;
+ Map<Integer, Double> map = (Map<Integer, Double>) v;
for (Map.Entry<Integer, Double> e : (map.entrySet()))
e.setValue(mapper.apply(e.getValue()));
- }
- else if (v instanceof BlockEntry) {
- BlockEntry be = (BlockEntry)v;
+ } else if (v instanceof MatrixBlockEntry) {
+ MatrixBlockEntry be = (MatrixBlockEntry) v;
be.map(mapper);
- }
- else
+ } else
throw new UnsupportedOperationException();
ce.cache().put(k, v);
@@ -360,34 +342,40 @@ public class CacheUtils {
*/
private static <K> IgnitePredicate<K> sparseKeyFilter(UUID matrixUuid) {
return key -> {
- if (key instanceof MatrixCacheKey)
- return ((MatrixCacheKey)key).matrixId().equals(matrixUuid);
+ if (key instanceof DataStructureCacheKey)
+ return ((DataStructureCacheKey) key).dataStructureId().equals(matrixUuid);
else if (key instanceof IgniteBiTuple)
- return ((IgniteBiTuple<Integer, UUID>)key).get2().equals(matrixUuid);
+ return ((IgniteBiTuple<Integer, UUID>) key).get2().equals(matrixUuid);
+ else if (key instanceof MatrixBlockKey)
+ return ((MatrixBlockKey) key).dataStructureId().equals(matrixUuid);
+ else if (key instanceof RowColMatrixKey)
+ return ((RowColMatrixKey) key).dataStructureId().equals(matrixUuid);
+ else if (key instanceof VectorBlockKey)
+ return ((VectorBlockKey) key).dataStructureId().equals(matrixUuid);
else
- throw new UnsupportedOperationException();
+ throw new UnsupportedOperationException(); // TODO: handle my poor doubles
};
}
/**
* @param cacheName Cache name.
- * @param fun An operation that accepts a cache entry and processes it.
- * @param <K> Cache key object type.
- * @param <V> Cache value object type.
+ * @param fun An operation that accepts a cache entry and processes it.
+ * @param <K> Cache key object type.
+ * @param <V> Cache value object type.
*/
- public static <K, V> void foreach(String cacheName, IgniteConsumer<CacheEntry<K, V>> fun) {
+ private static <K, V> void foreach(String cacheName, IgniteConsumer<CacheEntry<K, V>> fun) {
foreach(cacheName, fun, null);
}
/**
* @param cacheName Cache name.
- * @param fun An operation that accepts a cache entry and processes it.
+ * @param fun An operation that accepts a cache entry and processes it.
* @param keyFilter Cache keys filter.
- * @param <K> Cache key object type.
- * @param <V> Cache value object type.
+ * @param <K> Cache key object type.
+ * @param <V> Cache value object type.
*/
- public static <K, V> void foreach(String cacheName, IgniteConsumer<CacheEntry<K, V>> fun,
- IgnitePredicate<K> keyFilter) {
+ protected static <K, V> void foreach(String cacheName, IgniteConsumer<CacheEntry<K, V>> fun,
+ IgnitePredicate<K> keyFilter) {
bcast(cacheName, () -> {
Ignite ignite = Ignition.localIgnite();
IgniteCache<K, V> cache = ignite.getOrCreateCache(cacheName);
@@ -405,7 +393,7 @@ public class CacheUtils {
// Iterate over given partition.
// Query returns an empty cursor if this partition is not stored on this node.
for (Cache.Entry<K, V> entry : cache.query(new ScanQuery<K, V>(part,
- (k, v) -> affinity.mapPartitionToNode(p) == locNode && (keyFilter == null || keyFilter.apply(k)))))
+ (k, v) -> affinity.mapPartitionToNode(p) == locNode && (keyFilter == null || keyFilter.apply(k)))))
fun.accept(new CacheEntry<>(entry, cache));
}
});
@@ -413,14 +401,14 @@ public class CacheUtils {
/**
* @param cacheName Cache name.
- * @param fun An operation that accepts a cache entry and processes it.
- * @param ignite Ignite.
- * @param keysGen Keys generator.
- * @param <K> Cache key object type.
- * @param <V> Cache value object type.
+ * @param fun An operation that accepts a cache entry and processes it.
+ * @param ignite Ignite.
+ * @param keysGen Keys generator.
+ * @param <K> Cache key object type.
+ * @param <V> Cache value object type.
*/
public static <K, V> void update(String cacheName, Ignite ignite,
- IgniteBiFunction<Ignite, Cache.Entry<K, V>, Stream<Cache.Entry<K, V>>> fun, IgniteSupplier<Set<K>> keysGen) {
+ IgniteBiFunction<Ignite, Cache.Entry<K, V>, Stream<Cache.Entry<K, V>>> fun, IgniteSupplier<Set<K>> keysGen) {
bcast(cacheName, ignite, () -> {
Ignite ig = Ignition.localIgnite();
IgniteCache<K, V> cache = ig.getOrCreateCache(cacheName);
@@ -447,14 +435,14 @@ public class CacheUtils {
/**
* @param cacheName Cache name.
- * @param fun An operation that accepts a cache entry and processes it.
- * @param ignite Ignite.
- * @param keysGen Keys generator.
- * @param <K> Cache key object type.
- * @param <V> Cache value object type.
+ * @param fun An operation that accepts a cache entry and processes it.
+ * @param ignite Ignite.
+ * @param keysGen Keys generator.
+ * @param <K> Cache key object type.
+ * @param <V> Cache value object type.
*/
public static <K, V> void update(String cacheName, Ignite ignite, IgniteConsumer<Cache.Entry<K, V>> fun,
- IgniteSupplier<Set<K>> keysGen) {
+ IgniteSupplier<Set<K>> keysGen) {
bcast(cacheName, ignite, () -> {
Ignite ig = Ignition.localIgnite();
IgniteCache<K, V> cache = ig.getOrCreateCache(cacheName);
@@ -485,10 +473,10 @@ public class CacheUtils {
* <b>Currently fold supports only commutative operations.<b/>
*
* @param cacheName Cache name.
- * @param folder Fold function operating over cache entries.
- * @param <K> Cache key object type.
- * @param <V> Cache value object type.
- * @param <A> Fold result type.
+ * @param folder Fold function operating over cache entries.
+ * @param <K> Cache key object type.
+ * @param <V> Cache value object type.
+ * @param <A> Fold result type.
* @return Fold operation result.
*/
public static <K, V, A> Collection<A> fold(String cacheName, IgniteBiFunction<CacheEntry<K, V>, A, A> folder) {
@@ -499,14 +487,14 @@ public class CacheUtils {
* <b>Currently fold supports only commutative operations.<b/>
*
* @param cacheName Cache name.
- * @param folder Fold function operating over cache entries.
- * @param <K> Cache key object type.
- * @param <V> Cache value object type.
- * @param <A> Fold result type.
+ * @param folder Fold function operating over cache entries.
+ * @param <K> Cache key object type.
+ * @param <V> Cache value object type.
+ * @param <A> Fold result type.
* @return Fold operation result.
*/
public static <K, V, A> Collection<A> fold(String cacheName, IgniteBiFunction<CacheEntry<K, V>, A, A> folder,
- IgnitePredicate<K> keyFilter) {
+ IgnitePredicate<K> keyFilter) {
return bcast(cacheName, () -> {
Ignite ignite = Ignition.localIgnite();
IgniteCache<K, V> cache = ignite.getOrCreateCache(cacheName);
@@ -526,7 +514,7 @@ public class CacheUtils {
// Iterate over given partition.
// Query returns an empty cursor if this partition is not stored on this node.
for (Cache.Entry<K, V> entry : cache.query(new ScanQuery<K, V>(part,
- (k, v) -> affinity.mapPartitionToNode(p) == locNode && (keyFilter == null || keyFilter.apply(k)))))
+ (k, v) -> affinity.mapPartitionToNode(p) == locNode && (keyFilter == null || keyFilter.apply(k)))))
a = folder.apply(new CacheEntry<>(entry, cache), a);
}
@@ -537,34 +525,34 @@ public class CacheUtils {
/**
* Distributed version of fold operation.
*
- * @param cacheName Cache name.
- * @param folder Folder.
- * @param keyFilter Key filter.
+ * @param cacheName Cache name.
+ * @param folder Folder.
+ * @param keyFilter Key filter.
* @param accumulator Accumulator.
* @param zeroValSupp Zero value supplier.
*/
public static <K, V, A> A distributedFold(String cacheName, IgniteBiFunction<Cache.Entry<K, V>, A, A> folder,
- IgnitePredicate<K> keyFilter, BinaryOperator<A> accumulator, IgniteSupplier<A> zeroValSupp) {
+ IgnitePredicate<K> keyFilter, BinaryOperator<A> accumulator, IgniteSupplier<A> zeroValSupp) {
return sparseFold(cacheName, folder, keyFilter, accumulator, zeroValSupp, null, null, 0,
- false);
+ false);
}
/**
* Sparse version of fold. This method also applicable to sparse zeroes.
*
- * @param cacheName Cache name.
- * @param folder Folder.
- * @param keyFilter Key filter.
+ * @param cacheName Cache name.
+ * @param folder Folder.
+ * @param keyFilter Key filter.
* @param accumulator Accumulator.
* @param zeroValSupp Zero value supplier.
- * @param defVal Default value.
- * @param defKey Default key.
- * @param defValCnt Def value count.
+ * @param defVal Default value.
+ * @param defKey Default key.
+ * @param defValCnt Def value count.
* @param isNilpotent Is nilpotent.
*/
private static <K, V, A> A sparseFold(String cacheName, IgniteBiFunction<Cache.Entry<K, V>, A, A> folder,
- IgnitePredicate<K> keyFilter, BinaryOperator<A> accumulator, IgniteSupplier<A> zeroValSupp, V defVal, K defKey,
- long defValCnt, boolean isNilpotent) {
+ IgnitePredicate<K> keyFilter, BinaryOperator<A> accumulator, IgniteSupplier<A> zeroValSupp, V defVal, K defKey,
+ long defValCnt, boolean isNilpotent) {
A defRes = zeroValSupp.get();
@@ -591,7 +579,7 @@ public class CacheUtils {
// Iterate over given partition.
// Query returns an empty cursor if this partition is not stored on this node.
for (Cache.Entry<K, V> entry : cache.query(new ScanQuery<K, V>(part,
- (k, v) -> affinity.mapPartitionToNode(p) == locNode && (keyFilter == null || keyFilter.apply(k)))))
+ (k, v) -> affinity.mapPartitionToNode(p) == locNode && (keyFilter == null || keyFilter.apply(k)))))
a = folder.apply(entry, a);
}
@@ -601,10 +589,10 @@ public class CacheUtils {
}
public static <K, V, A, W> A reduce(String cacheName, Ignite ignite,
- IgniteTriFunction<W, Cache.Entry<K, V>, A, A> acc,
- IgniteSupplier<W> supp,
- IgniteSupplier<Iterable<Cache.Entry<K, V>>> entriesGen, IgniteBinaryOperator<A> comb,
- IgniteSupplier<A> zeroValSupp) {
+ IgniteTriFunction<W, Cache.Entry<K, V>, A, A> acc,
+ IgniteSupplier<W> supp,
+ IgniteSupplier<Iterable<Cache.Entry<K, V>>> entriesGen, IgniteBinaryOperator<A> comb,
+ IgniteSupplier<A> zeroValSupp) {
A defRes = zeroValSupp.get();
@@ -624,15 +612,15 @@ public class CacheUtils {
}
public static <K, V, A, W> A reduce(String cacheName, IgniteTriFunction<W, Cache.Entry<K, V>, A, A> acc,
- IgniteSupplier<W> supp,
- IgniteSupplier<Iterable<Cache.Entry<K, V>>> entriesGen, IgniteBinaryOperator<A> comb,
- IgniteSupplier<A> zeroValSupp) {
+ IgniteSupplier<W> supp,
+ IgniteSupplier<Iterable<Cache.Entry<K, V>>> entriesGen, IgniteBinaryOperator<A> comb,
+ IgniteSupplier<A> zeroValSupp) {
return reduce(cacheName, Ignition.localIgnite(), acc, supp, entriesGen, comb, zeroValSupp);
}
/**
* @param cacheName Cache name.
- * @param run {@link Runnable} to broadcast to cache nodes for given cache name.
+ * @param run {@link Runnable} to broadcast to cache nodes for given cache name.
*/
public static void bcast(String cacheName, Ignite ignite, IgniteRunnable run) {
ignite.compute(ignite.cluster().forDataNodes(cacheName)).broadcast(run);
@@ -640,8 +628,9 @@ public class CacheUtils {
/**
* Broadcast runnable to data nodes of given cache.
+ *
* @param cacheName Cache name.
- * @param run Runnable.
+ * @param run Runnable.
*/
public static void bcast(String cacheName, IgniteRunnable run) {
bcast(cacheName, ignite(), run);
@@ -649,8 +638,8 @@ public class CacheUtils {
/**
* @param cacheName Cache name.
- * @param call {@link IgniteCallable} to broadcast to cache nodes for given cache name.
- * @param <A> Type returned by the callable.
+ * @param call {@link IgniteCallable} to broadcast to cache nodes for given cache name.
+ * @param <A> Type returned by the callable.
*/
public static <A> Collection<A> bcast(String cacheName, IgniteCallable<A> call) {
return bcast(cacheName, ignite(), call);
@@ -658,13 +647,42 @@ public class CacheUtils {
/**
* Broadcast callable to data nodes of given cache.
+ *
* @param cacheName Cache name.
- * @param ignite Ignite instance.
- * @param call Callable to broadcast.
- * @param <A> Type of callable result.
+ * @param ignite Ignite instance.
+ * @param call Callable to broadcast.
+ * @param <A> Type of callable result.
* @return Results of callable from each node.
*/
public static <A> Collection<A> bcast(String cacheName, Ignite ignite, IgniteCallable<A> call) {
return ignite.compute(ignite.cluster().forDataNodes(cacheName)).broadcast(call);
}
+
+ /**
+ * @param vectorUuid Matrix UUID.
+ * @param mapper Mapping {@link IgniteFunction}.
+ */
+ @SuppressWarnings("unchecked")
+ public static <K, V> void sparseMapForVector(UUID vectorUuid, IgniteDoubleFunction<V> mapper, String cacheName) {
+ A.notNull(vectorUuid, "vectorUuid");
+ A.notNull(cacheName, "cacheName");
+ A.notNull(mapper, "mapper");
+
+ foreach(cacheName, (CacheEntry<K, V> ce) -> {
+ K k = ce.entry().getKey();
+
+ V v = ce.entry().getValue();
+
+ if (v instanceof VectorBlockEntry) {
+ VectorBlockEntry entry = (VectorBlockEntry) v;
+
+ for (int i = 0; i < entry.size(); i++) entry.set(i, (Double) mapper.apply(entry.get(i)));
+
+ ce.cache().put(k, (V) entry);
+ } else {
+ V mappingRes = mapper.apply((Double) v);
+ ce.cache().put(k, mappingRes);
+ }
+ }, sparseKeyFilter(vectorUuid));
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b0a86018/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/BlockMatrixKey.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/BlockMatrixKey.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/BlockMatrixKey.java
deleted file mode 100644
index 091b325..0000000
--- a/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/BlockMatrixKey.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.ml.math.distributed.keys;
-
-import org.apache.ignite.internal.util.lang.IgnitePair;
-import org.apache.ignite.ml.math.impls.matrix.SparseBlockDistributedMatrix;
-
-/**
- * Cache key for blocks in {@link SparseBlockDistributedMatrix}.
- *
- * TODO: check if using {@link IgnitePair} will be better for block id.
- */
-public interface BlockMatrixKey extends MatrixCacheKey {
- /**
- * @return block row id.
- */
- public long blockRowId();
-
- /**
- * @return block col id.
- */
- public long blockColId();
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b0a86018/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/DataStructureCacheKey.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/DataStructureCacheKey.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/DataStructureCacheKey.java
new file mode 100644
index 0000000..d99ea48
--- /dev/null
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/DataStructureCacheKey.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.math.distributed.keys;
+
+import java.util.UUID;
+
+/**
+ * Base matrix cache key.
+ */
+public interface DataStructureCacheKey {
+ /**
+ * @return matrix id.
+ */
+ public UUID dataStructureId();
+
+ /**
+ * @return affinity key.
+ */
+ public Object affinityKey();
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b0a86018/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/MatrixBlockKey.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/MatrixBlockKey.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/MatrixBlockKey.java
new file mode 100644
index 0000000..9c76568
--- /dev/null
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/MatrixBlockKey.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.math.distributed.keys;
+
+import org.apache.ignite.internal.util.lang.IgnitePair;
+import org.apache.ignite.ml.math.impls.matrix.SparseBlockDistributedMatrix;
+
+/**
+ * Cache key for blocks in {@link SparseBlockDistributedMatrix}.
+ *
+ * TODO: check if using {@link IgnitePair} will be better for block id.
+ */
+public interface MatrixBlockKey extends DataStructureCacheKey {
+ /**
+ * @return block row id.
+ */
+ public long blockRowId();
+
+ /**
+ * @return block col id.
+ */
+ public long blockColId();
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b0a86018/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/MatrixCacheKey.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/MatrixCacheKey.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/MatrixCacheKey.java
deleted file mode 100644
index 0242560..0000000
--- a/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/MatrixCacheKey.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.ml.math.distributed.keys;
-
-import java.util.UUID;
-
-/**
- * Base matrix cache key.
- */
-public interface MatrixCacheKey {
- /**
- * @return matrix id.
- */
- public UUID matrixId();
-
- /**
- * @return affinity key.
- */
- public Object affinityKey();
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b0a86018/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/RowColMatrixKey.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/RowColMatrixKey.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/RowColMatrixKey.java
index 168f49f..78af2e8 100644
--- a/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/RowColMatrixKey.java
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/RowColMatrixKey.java
@@ -22,7 +22,7 @@ import org.apache.ignite.ml.math.impls.matrix.SparseDistributedMatrix;
/**
* Cache key for {@link SparseDistributedMatrix}.
*/
-public interface RowColMatrixKey extends MatrixCacheKey {
+public interface RowColMatrixKey extends DataStructureCacheKey {
/**
* Return index value(blockId, Row/Col index, etc.)
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/b0a86018/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/VectorBlockKey.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/VectorBlockKey.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/VectorBlockKey.java
new file mode 100644
index 0000000..32af965
--- /dev/null
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/VectorBlockKey.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.math.distributed.keys;
+
+import org.apache.ignite.internal.util.lang.IgnitePair;
+import org.apache.ignite.ml.math.impls.vector.SparseBlockDistributedVector;
+
+/**
+ * Cache key for blocks in {@link SparseBlockDistributedVector}.
+ *
+ * TODO: check if using {@link IgnitePair} will be better for block id.
+ */
+public interface VectorBlockKey extends DataStructureCacheKey {
+ /**
+ * @return block id.
+ */
+ public long blockId();
+
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b0a86018/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/impl/BlockMatrixKey.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/impl/BlockMatrixKey.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/impl/BlockMatrixKey.java
deleted file mode 100644
index cc8c488..0000000
--- a/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/impl/BlockMatrixKey.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.ml.math.distributed.keys.impl;
-
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.util.UUID;
-import org.apache.ignite.binary.BinaryObjectException;
-import org.apache.ignite.binary.BinaryRawReader;
-import org.apache.ignite.binary.BinaryRawWriter;
-import org.apache.ignite.binary.BinaryReader;
-import org.apache.ignite.binary.BinaryWriter;
-import org.apache.ignite.binary.Binarylizable;
-import org.apache.ignite.internal.binary.BinaryUtils;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteUuid;
-import org.apache.ignite.ml.math.impls.matrix.BlockEntry;
-import org.apache.ignite.ml.math.impls.matrix.SparseBlockDistributedMatrix;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Key implementation for {@link BlockEntry} using for {@link SparseBlockDistributedMatrix}.
- */
-public class BlockMatrixKey implements org.apache.ignite.ml.math.distributed.keys.BlockMatrixKey, Externalizable, Binarylizable {
- /** */
- private static final long serialVersionUID = 0L;
- /** Block row ID */
- private long blockIdRow;
- /** Block col ID */
- private long blockIdCol;
- /** Matrix ID */
- private UUID matrixUuid;
- /** Block affinity key. */
- private IgniteUuid affinityKey;
-
- /**
- * Empty constructor required for {@link Externalizable}.
- */
- public BlockMatrixKey() {
- // No-op.
- }
-
- /**
- * Construct matrix block key.
- *
- * @param matrixUuid Matrix uuid.
- * @param affinityKey Affinity key.
- */
- public BlockMatrixKey(long rowId, long colId, UUID matrixUuid, @Nullable IgniteUuid affinityKey) {
- assert rowId >= 0;
- assert colId >= 0;
- assert matrixUuid != null;
-
- this.blockIdRow = rowId;
- this.blockIdCol = colId;
- this.matrixUuid = matrixUuid;
- this.affinityKey = affinityKey;
- }
-
- /** {@inheritDoc} */
- @Override public long blockRowId() {
- return blockIdRow;
- }
-
- /** {@inheritDoc} */
- @Override public long blockColId() {
- return blockIdCol;
- }
-
- /** {@inheritDoc} */
- @Override public UUID matrixId() {
- return matrixUuid;
- }
-
- /** {@inheritDoc} */
- @Override public IgniteUuid affinityKey() {
- return affinityKey;
- }
-
- /** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- out.writeObject(matrixUuid);
- U.writeGridUuid(out, affinityKey);
- out.writeLong(blockIdRow);
- out.writeLong(blockIdCol);
- }
-
- /** {@inheritDoc} */
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- matrixUuid = (UUID)in.readObject();
- affinityKey = U.readGridUuid(in);
- blockIdRow = in.readLong();
- blockIdCol = in.readLong();
- }
-
- /** {@inheritDoc} */
- @Override public void writeBinary(BinaryWriter writer) throws BinaryObjectException {
- BinaryRawWriter out = writer.rawWriter();
-
- out.writeUuid(matrixUuid);
- BinaryUtils.writeIgniteUuid(out, affinityKey);
- out.writeLong(blockIdRow);
- out.writeLong(blockIdCol);
- }
-
- /** {@inheritDoc} */
- @Override public void readBinary(BinaryReader reader) throws BinaryObjectException {
- BinaryRawReader in = reader.rawReader();
-
- matrixUuid = in.readUuid();
- affinityKey = BinaryUtils.readIgniteUuid(in);
- blockIdRow = in.readLong();
- blockIdCol = in.readLong();
- }
-
- /** {@inheritDoc} */
- @Override public int hashCode() {
- int res = 37;
-
- res += res * 37 + blockIdCol;
- res += res * 37 + blockIdRow;
- res += res * 37 + matrixUuid.hashCode();
-
- return res;
- }
-
- /** {@inheritDoc} */
- @Override public boolean equals(Object obj) {
- if (obj == this)
- return true;
-
- if (obj == null || obj.getClass() != getClass())
- return false;
-
- BlockMatrixKey that = (BlockMatrixKey)obj;
-
- return blockIdRow == that.blockIdRow && blockIdCol == that.blockIdCol && matrixUuid.equals(that.matrixUuid)
- && F.eq(affinityKey, that.affinityKey);
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(BlockMatrixKey.class, this);
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b0a86018/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/impl/MatrixBlockKey.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/impl/MatrixBlockKey.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/impl/MatrixBlockKey.java
new file mode 100644
index 0000000..9e8d81e
--- /dev/null
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/impl/MatrixBlockKey.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.math.distributed.keys.impl;
+
+import org.apache.ignite.binary.*;
+import org.apache.ignite.internal.binary.BinaryUtils;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.ml.math.impls.matrix.MatrixBlockEntry;
+import org.apache.ignite.ml.math.impls.matrix.SparseBlockDistributedMatrix;
+import org.jetbrains.annotations.Nullable;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.UUID;
+
+/**
+ * Key implementation for {@link MatrixBlockEntry} using for {@link SparseBlockDistributedMatrix}.
+ */
+public class MatrixBlockKey implements org.apache.ignite.ml.math.distributed.keys.MatrixBlockKey, Externalizable, Binarylizable {
+ /** */
+ private static final long serialVersionUID = 0L;
+ /** Block row ID */
+ private long blockIdRow;
+ /** Block col ID */
+ private long blockIdCol;
+ /** Matrix ID */
+ private UUID matrixUuid;
+ /** Block affinity key. */
+ private UUID affinityKey;
+
+ /**
+ * Empty constructor required for {@link Externalizable}.
+ */
+ public MatrixBlockKey() {
+ // No-op.
+ }
+
+ /**
+ * Construct matrix block key.
+ *
+ * @param matrixUuid Matrix uuid.
+ * @param affinityKey Affinity key.
+ */
+ public MatrixBlockKey(long rowId, long colId, UUID matrixUuid, @Nullable UUID affinityKey) {
+ assert rowId >= 0;
+ assert colId >= 0;
+ assert matrixUuid != null;
+
+ this.blockIdRow = rowId;
+ this.blockIdCol = colId;
+ this.matrixUuid = matrixUuid;
+ this.affinityKey = affinityKey;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long blockRowId() {
+ return blockIdRow;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long blockColId() {
+ return blockIdCol;
+ }
+
+ /** {@inheritDoc} */
+ @Override public UUID dataStructureId() {
+ return matrixUuid;
+ }
+
+ /** {@inheritDoc} */
+ @Override public UUID affinityKey() {
+ return affinityKey;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeObject(matrixUuid);
+ out.writeObject(affinityKey);
+ out.writeLong(blockIdRow);
+ out.writeLong(blockIdCol);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ matrixUuid = (UUID)in.readObject();
+ affinityKey = (UUID)in.readObject();
+ blockIdRow = in.readLong();
+ blockIdCol = in.readLong();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeBinary(BinaryWriter writer) throws BinaryObjectException {
+ BinaryRawWriter out = writer.rawWriter();
+
+ out.writeUuid(matrixUuid);
+ out.writeUuid(affinityKey);
+ out.writeLong(blockIdRow);
+ out.writeLong(blockIdCol);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readBinary(BinaryReader reader) throws BinaryObjectException {
+ BinaryRawReader in = reader.rawReader();
+
+ matrixUuid = in.readUuid();
+ affinityKey = in.readUuid();
+ blockIdRow = in.readLong();
+ blockIdCol = in.readLong();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ int res = 37;
+
+ res += res * 37 + blockIdCol;
+ res += res * 37 + blockIdRow;
+ res += res * 37 + matrixUuid.hashCode();
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object obj) {
+ if (obj == this)
+ return true;
+
+ if (obj == null || obj.getClass() != getClass())
+ return false;
+
+ MatrixBlockKey that = (MatrixBlockKey)obj;
+
+ return blockIdRow == that.blockIdRow && blockIdCol == that.blockIdCol && matrixUuid.equals(that.matrixUuid)
+ && F.eq(affinityKey, that.affinityKey);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(MatrixBlockKey.class, this);
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b0a86018/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/impl/SparseMatrixKey.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/impl/SparseMatrixKey.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/impl/SparseMatrixKey.java
index aa5e0ad..980d433 100644
--- a/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/impl/SparseMatrixKey.java
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/impl/SparseMatrixKey.java
@@ -17,17 +17,18 @@
package org.apache.ignite.ml.math.distributed.keys.impl;
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.util.UUID;
import org.apache.ignite.cache.affinity.AffinityKeyMapped;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.ml.math.distributed.keys.RowColMatrixKey;
import org.apache.ignite.ml.math.impls.matrix.SparseDistributedMatrix;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.UUID;
+
/**
* Key implementation for {@link SparseDistributedMatrix}.
*/
@@ -65,7 +66,7 @@ public class SparseMatrixKey implements RowColMatrixKey, Externalizable {
}
/** {@inheritDoc} */
- @Override public UUID matrixId() {
+ @Override public UUID dataStructureId() {
return matrixId;
}
@@ -76,7 +77,6 @@ public class SparseMatrixKey implements RowColMatrixKey, Externalizable {
/** {@inheritDoc} */
@Override public void writeExternal(ObjectOutput out) throws IOException {
-// U.writeGridUuid(out, matrixId);
out.writeObject(matrixId);
out.writeObject(affinityKey);
out.writeInt(idx);
http://git-wip-us.apache.org/repos/asf/ignite/blob/b0a86018/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/impl/VectorBlockKey.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/impl/VectorBlockKey.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/impl/VectorBlockKey.java
new file mode 100644
index 0000000..6052010
--- /dev/null
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/impl/VectorBlockKey.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.math.distributed.keys.impl;
+
+import org.apache.ignite.binary.*;
+import org.apache.ignite.internal.binary.BinaryUtils;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.ml.math.impls.vector.SparseBlockDistributedVector;
+import org.apache.ignite.ml.math.impls.vector.VectorBlockEntry;
+import org.jetbrains.annotations.Nullable;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.UUID;
+
+/**
+ * Key implementation for {@link VectorBlockEntry} using for {@link SparseBlockDistributedVector}.
+ */
+public class VectorBlockKey implements org.apache.ignite.ml.math.distributed.keys.VectorBlockKey, Externalizable, Binarylizable {
+ /** */
+ private static final long serialVersionUID = 0L;
+ /** Block row ID */
+ private long blockId;
+ /** Vector ID */
+ private UUID vectorUuid;
+ /** Block affinity key. */
+ private UUID affinityKey;
+
+ /**
+ * Empty constructor required for {@link Externalizable}.
+ */
+ public VectorBlockKey() {
+ // No-op.
+ }
+
+ /**
+ * Construct vector block key.
+ *
+ * @param vectorUuid Vector uuid.
+ * @param affinityKey Affinity key.
+ */
+ public VectorBlockKey(long blockId, UUID vectorUuid, @Nullable UUID affinityKey) {
+ assert blockId >= 0;
+ assert vectorUuid != null;
+
+ this.blockId = blockId;
+ this.vectorUuid = vectorUuid;
+ this.affinityKey = affinityKey;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long blockId() {
+ return blockId;
+ }
+
+
+ /** {@inheritDoc} */
+ @Override public UUID dataStructureId() {
+ return vectorUuid;
+ }
+
+ /** {@inheritDoc} */
+ @Override public UUID affinityKey() {
+ return affinityKey;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeObject(vectorUuid);
+ out.writeObject(affinityKey);
+ out.writeLong(blockId);
+
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ vectorUuid = (UUID)in.readObject();
+ affinityKey = (UUID)in.readObject();
+ blockId = in.readLong();
+
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeBinary(BinaryWriter writer) throws BinaryObjectException {
+ BinaryRawWriter out = writer.rawWriter();
+
+ out.writeUuid(vectorUuid);
+ out.writeUuid(affinityKey);
+ out.writeLong(blockId);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readBinary(BinaryReader reader) throws BinaryObjectException {
+ BinaryRawReader in = reader.rawReader();
+
+ vectorUuid = in.readUuid();
+ affinityKey = in.readUuid();
+ blockId = in.readLong();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ int res = 37;
+
+ res += res * 37 + blockId;
+ res += res * 37 + vectorUuid.hashCode();
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object obj) {
+ if (obj == this)
+ return true;
+
+ if (obj == null || obj.getClass() != getClass())
+ return false;
+
+ VectorBlockKey that = (VectorBlockKey)obj;
+
+ return blockId == that.blockId && vectorUuid.equals(that.vectorUuid)
+ && F.eq(affinityKey, that.affinityKey);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(VectorBlockKey.class, this);
+ }
+
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/b0a86018/modules/ml/src/main/java/org/apache/ignite/ml/math/functions/Functions.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/functions/Functions.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/functions/Functions.java
index 0b4ad12..ce534bd 100644
--- a/modules/ml/src/main/java/org/apache/ignite/ml/math/functions/Functions.java
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/functions/Functions.java
@@ -17,10 +17,11 @@
package org.apache.ignite.ml.math.functions;
+import org.apache.ignite.lang.IgniteBiTuple;
+
import java.util.Comparator;
import java.util.List;
import java.util.function.BiFunction;
-import org.apache.ignite.lang.IgniteBiTuple;
/**
* Compatibility with Apache Mahout.
http://git-wip-us.apache.org/repos/asf/ignite/blob/b0a86018/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/AbstractMatrix.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/AbstractMatrix.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/AbstractMatrix.java
index 06fb34c..89f567e 100644
--- a/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/AbstractMatrix.java
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/AbstractMatrix.java
@@ -43,6 +43,7 @@ import org.apache.ignite.ml.math.functions.IgniteTriFunction;
import org.apache.ignite.ml.math.functions.IntIntToDoubleFunction;
import org.apache.ignite.ml.math.impls.vector.DenseLocalOnHeapVector;
import org.apache.ignite.ml.math.impls.vector.MatrixVectorView;
+import org.apache.ignite.ml.math.util.MatrixUtil;
/**
* This class provides a helper implementation of the {@link Matrix}
@@ -282,7 +283,7 @@ public abstract class AbstractMatrix implements Matrix {
*
* @param row Row index.
*/
- private void checkRowIndex(int row) {
+ void checkRowIndex(int row) {
if (row < 0 || row >= rowSize())
throw new RowIndexException(row);
}
@@ -292,7 +293,7 @@ public abstract class AbstractMatrix implements Matrix {
*
* @param col Column index.
*/
- private void checkColumnIndex(int col) {
+ void checkColumnIndex(int col) {
if (col < 0 || col >= columnSize())
throw new ColumnIndexException(col);
}
@@ -303,7 +304,7 @@ public abstract class AbstractMatrix implements Matrix {
* @param row Row index.
* @param col Column index.
*/
- protected void checkIndex(int row, int col) {
+ private void checkIndex(int row, int col) {
checkRowIndex(row);
checkColumnIndex(col);
}
@@ -739,11 +740,12 @@ public abstract class AbstractMatrix implements Matrix {
/** {@inheritDoc} */
@Override public Vector getCol(int col) {
checkColumnIndex(col);
-
- Vector res = new DenseLocalOnHeapVector(rowSize());
+ Vector res;
+ if (isDistributed()) res = MatrixUtil.likeVector(this, rowSize());
+ else res = new DenseLocalOnHeapVector(rowSize());
for (int i = 0; i < rowSize(); i++)
- res.setX(i, getX(i,col));
+ res.setX(i, getX(i, col));
return res;
}
@@ -974,4 +976,14 @@ public abstract class AbstractMatrix implements Matrix {
@Override public void compute(int row, int col, IgniteTriFunction<Integer, Integer, Double, Double> f) {
setX(row, col, f.apply(row, col, getX(row, col)));
}
+
+
+ protected int getMaxAmountOfColumns(double[][] data) {
+ int maxAmountOfColumns = 0;
+
+ for (int i = 0; i < data.length; i++)
+ maxAmountOfColumns = Math.max(maxAmountOfColumns, data[i].length);
+
+ return maxAmountOfColumns;
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b0a86018/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/BlockEntry.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/BlockEntry.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/BlockEntry.java
deleted file mode 100644
index 47f07ce..0000000
--- a/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/BlockEntry.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.ml.math.impls.matrix;
-
-import org.apache.ignite.ml.math.Matrix;
-
-/**
- * Block for {@link SparseBlockDistributedMatrix}.
- */
-public final class BlockEntry extends SparseLocalOnHeapMatrix {
- /** Max block size. */
- public static final int MAX_BLOCK_SIZE = 32;
-
- /** */
- public BlockEntry() {
- // No-op.
- }
-
- /** */
- public BlockEntry(int row, int col) {
- super(row, col);
-
- assert col <= MAX_BLOCK_SIZE;
- assert row <= MAX_BLOCK_SIZE;
- }
-
- /** */
- public BlockEntry(Matrix mtx) {
- assert mtx.columnSize() <= MAX_BLOCK_SIZE;
- assert mtx.rowSize() <= MAX_BLOCK_SIZE;
-
- setStorage(mtx.getStorage());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b0a86018/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/MatrixBlockEntry.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/MatrixBlockEntry.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/MatrixBlockEntry.java
new file mode 100644
index 0000000..a2d13a1
--- /dev/null
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/MatrixBlockEntry.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.math.impls.matrix;
+
+import org.apache.ignite.ml.math.Matrix;
+
+/**
+ * Block for {@link SparseBlockDistributedMatrix}.
+ */
+public final class MatrixBlockEntry extends SparseLocalOnHeapMatrix {
+ /** Max block size. */
+ public static final int MAX_BLOCK_SIZE = 32;
+
+ /** */
+ public MatrixBlockEntry() {
+ // No-op.
+ }
+
+ /** */
+ public MatrixBlockEntry(int row, int col) {
+ super(row, col);
+
+ assert col <= MAX_BLOCK_SIZE;
+ assert row <= MAX_BLOCK_SIZE;
+ }
+
+ /** */
+ public MatrixBlockEntry(Matrix mtx) {
+ assert mtx.columnSize() <= MAX_BLOCK_SIZE;
+ assert mtx.rowSize() <= MAX_BLOCK_SIZE;
+
+ setStorage(mtx.getStorage());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b0a86018/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/SparseBlockDistributedMatrix.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/SparseBlockDistributedMatrix.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/SparseBlockDistributedMatrix.java
index e829168..ea9fb8c 100644
--- a/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/SparseBlockDistributedMatrix.java
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/SparseBlockDistributedMatrix.java
@@ -17,10 +17,6 @@
package org.apache.ignite.ml.math.impls.matrix;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.Ignition;
@@ -31,16 +27,25 @@ import org.apache.ignite.ml.math.Matrix;
import org.apache.ignite.ml.math.StorageConstants;
import org.apache.ignite.ml.math.Vector;
import org.apache.ignite.ml.math.distributed.CacheUtils;
-import org.apache.ignite.ml.math.distributed.keys.impl.BlockMatrixKey;
+import org.apache.ignite.ml.math.distributed.keys.impl.MatrixBlockKey;
+import org.apache.ignite.ml.math.distributed.keys.impl.VectorBlockKey;
import org.apache.ignite.ml.math.exceptions.CardinalityException;
-import org.apache.ignite.ml.math.exceptions.UnsupportedOperationException;
import org.apache.ignite.ml.math.functions.IgniteDoubleFunction;
import org.apache.ignite.ml.math.impls.storage.matrix.BlockMatrixStorage;
+import org.apache.ignite.ml.math.impls.storage.matrix.BlockVectorStorage;
+import org.apache.ignite.ml.math.impls.vector.SparseBlockDistributedVector;
+import org.apache.ignite.ml.math.impls.vector.SparseDistributedVector;
+import org.apache.ignite.ml.math.impls.vector.VectorBlockEntry;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
/**
- * Sparse block distributed matrix. This matrix represented by blocks 32x32 {@link BlockEntry}.
+ * Sparse block distributed matrix. This matrix represented by blocks 32x32 {@link MatrixBlockEntry}.
*
- * Using separate cache with keys {@link BlockMatrixKey} and values {@link BlockEntry}.
+ * Using separate cache with keys {@link MatrixBlockKey} and values {@link MatrixBlockEntry}.
*/
public class SparseBlockDistributedMatrix extends AbstractMatrix implements StorageConstants {
/**
@@ -62,6 +67,18 @@ public class SparseBlockDistributedMatrix extends AbstractMatrix implements Stor
}
/**
+ * @param data Data to fill the matrix
+ */
+ public SparseBlockDistributedMatrix(double[][] data) {
+ assert data.length > 0;
+ setStorage(new BlockMatrixStorage(data.length, getMaxAmountOfColumns(data)));
+
+ for (int i = 0; i < data.length; i++)
+ for (int j = 0; j < data[i].length; j++)
+ storage().set(i, j, data[i][j]);
+ }
+
+ /**
* Return the same matrix with updates values (broken contract).
*
* @param d Value to divide to.
@@ -100,22 +117,22 @@ public class SparseBlockDistributedMatrix extends AbstractMatrix implements Stor
throw new CardinalityException(columnSize(), mtx.rowSize());
SparseBlockDistributedMatrix matrixA = this;
- SparseBlockDistributedMatrix matrixB = (SparseBlockDistributedMatrix)mtx;
+ SparseBlockDistributedMatrix matrixB = (SparseBlockDistributedMatrix) mtx;
String cacheName = this.storage().cacheName();
SparseBlockDistributedMatrix matrixC = new SparseBlockDistributedMatrix(matrixA.rowSize(), matrixB.columnSize());
CacheUtils.bcast(cacheName, () -> {
Ignite ignite = Ignition.localIgnite();
- Affinity<BlockMatrixKey> affinity = ignite.affinity(cacheName);
+ Affinity<MatrixBlockKey> affinity = ignite.affinity(cacheName);
- IgniteCache<BlockMatrixKey, BlockEntry> cache = ignite.getOrCreateCache(cacheName);
+ IgniteCache<MatrixBlockKey, MatrixBlockEntry> cache = ignite.getOrCreateCache(cacheName);
ClusterNode locNode = ignite.cluster().localNode();
BlockMatrixStorage storageC = matrixC.storage();
- Map<ClusterNode, Collection<BlockMatrixKey>> keysCToNodes = affinity.mapKeysToNodes(storageC.getAllKeys());
- Collection<BlockMatrixKey> locKeys = keysCToNodes.get(locNode);
+ Map<ClusterNode, Collection<MatrixBlockKey>> keysCToNodes = affinity.mapKeysToNodes(storageC.getAllKeys());
+ Collection<MatrixBlockKey> locKeys = keysCToNodes.get(locNode);
if (locKeys == null)
return;
@@ -128,18 +145,18 @@ public class SparseBlockDistributedMatrix extends AbstractMatrix implements Stor
IgnitePair<Long> newBlockId = new IgnitePair<>(newBlockIdRow, newBlockIdCol);
- BlockEntry blockC = null;
+ MatrixBlockEntry blockC = null;
- List<BlockEntry> aRow = matrixA.storage().getRowForBlock(newBlockId);
- List<BlockEntry> bCol = matrixB.storage().getColForBlock(newBlockId);
+ List<MatrixBlockEntry> aRow = matrixA.storage().getRowForBlock(newBlockId);
+ List<MatrixBlockEntry> bCol = matrixB.storage().getColForBlock(newBlockId);
for (int i = 0; i < aRow.size(); i++) {
- BlockEntry blockA = aRow.get(i);
- BlockEntry blockB = bCol.get(i);
+ MatrixBlockEntry blockA = aRow.get(i);
+ MatrixBlockEntry blockB = bCol.get(i);
- BlockEntry tmpBlock = new BlockEntry(blockA.times(blockB));
+ MatrixBlockEntry tmpBlock = new MatrixBlockEntry(blockA.times(blockB));
- blockC = blockC == null ? tmpBlock : new BlockEntry(blockC.plus(tmpBlock));
+ blockC = blockC == null ? tmpBlock : new MatrixBlockEntry(blockC.plus(tmpBlock));
}
cache.put(storageC.getCacheKey(newBlockIdRow, newBlockIdCol), blockC);
@@ -149,6 +166,90 @@ public class SparseBlockDistributedMatrix extends AbstractMatrix implements Stor
return matrixC;
}
+
+ /**
+ * {@inheritDoc}
+ */
+ @SuppressWarnings({"unchecked"})
+ @Override public Vector times(final Vector vec) {
+ if (vec == null)
+ throw new IllegalArgumentException("The vector should be not null.");
+
+ if (columnSize() != vec.size())
+ throw new CardinalityException(columnSize(), vec.size());
+
+ SparseBlockDistributedMatrix matrixA = this;
+ SparseBlockDistributedVector vectorB = (SparseBlockDistributedVector) vec;
+
+
+ String cacheName = this.storage().cacheName();
+ SparseBlockDistributedVector vectorC = new SparseBlockDistributedVector(matrixA.rowSize());
+
+ CacheUtils.bcast(cacheName, () -> {
+ Ignite ignite = Ignition.localIgnite();
+ Affinity<VectorBlockKey> affinity = ignite.affinity(cacheName);
+
+ IgniteCache<VectorBlockKey, VectorBlockEntry> cache = ignite.getOrCreateCache(cacheName);
+ ClusterNode locNode = ignite.cluster().localNode();
+
+ BlockVectorStorage storageC = vectorC.storage();
+
+ Map<ClusterNode, Collection<VectorBlockKey>> keysCToNodes = affinity.mapKeysToNodes(storageC.getAllKeys());
+ Collection<VectorBlockKey> locKeys = keysCToNodes.get(locNode);
+
+ if (locKeys == null)
+ return;
+
+ // compute Cij locally on each node
+ // TODO: IGNITE:5114, exec in parallel
+ locKeys.forEach(key -> {
+ long newBlockId = key.blockId();
+
+
+ IgnitePair<Long> newBlockIdForMtx = new IgnitePair<>(newBlockId, 0L);
+
+ VectorBlockEntry blockC = null;
+
+ List<MatrixBlockEntry> aRow = matrixA.storage().getRowForBlock(newBlockIdForMtx);
+ List<VectorBlockEntry> bCol = vectorB.storage().getColForBlock(newBlockId);
+
+ for (int i = 0; i < aRow.size(); i++) {
+ MatrixBlockEntry blockA = aRow.get(i);
+ VectorBlockEntry blockB = bCol.get(i);
+
+ VectorBlockEntry tmpBlock = new VectorBlockEntry(blockA.times(blockB));
+
+ blockC = blockC == null ? tmpBlock : new VectorBlockEntry(blockC.plus(tmpBlock));
+ }
+
+ cache.put(storageC.getCacheKey(newBlockId), blockC);
+ });
+ });
+ return vectorC;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Vector getCol(int col) {
+ checkColumnIndex(col);
+
+ Vector res = new SparseDistributedVector(rowSize());
+
+ for (int i = 0; i < rowSize(); i++)
+ res.setX(i, getX(i, col));
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Vector getRow(int row) {
+ checkRowIndex(row);
+
+ Vector res = new SparseDistributedVector(columnSize());
+
+ for (int i = 0; i < columnSize(); i++)
+ res.setX(i, getX(row, i));
+ return res;
+ }
+
/** {@inheritDoc} */
@Override public Matrix assign(double val) {
return mapOverValues(v -> val);
@@ -176,7 +277,11 @@ public class SparseBlockDistributedMatrix extends AbstractMatrix implements Stor
/** {@inheritDoc} */
@Override public Matrix copy() {
- throw new UnsupportedOperationException();
+ Matrix cp = like(rowSize(), columnSize());
+
+ cp.assign(this);
+
+ return cp;
}
/** {@inheritDoc} */
@@ -186,12 +291,12 @@ public class SparseBlockDistributedMatrix extends AbstractMatrix implements Stor
/** {@inheritDoc} */
@Override public Vector likeVector(int crd) {
- throw new UnsupportedOperationException();
+ return new SparseBlockDistributedVector(crd);
}
/** */
private UUID getUUID() {
- return ((BlockMatrixStorage)getStorage()).getUUID();
+ return ((BlockMatrixStorage) getStorage()).getUUID();
}
/**
@@ -208,6 +313,6 @@ public class SparseBlockDistributedMatrix extends AbstractMatrix implements Stor
*
*/
private BlockMatrixStorage storage() {
- return (BlockMatrixStorage)getStorage();
+ return (BlockMatrixStorage) getStorage();
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b0a86018/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/SparseDistributedMatrix.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/SparseDistributedMatrix.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/SparseDistributedMatrix.java
index 594aebc..497241d 100644
--- a/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/SparseDistributedMatrix.java
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/SparseDistributedMatrix.java
@@ -17,24 +17,24 @@
package org.apache.ignite.ml.math.impls.matrix;
-import java.util.Collection;
-import java.util.Map;
-import java.util.UUID;
import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteCache;
import org.apache.ignite.Ignition;
import org.apache.ignite.cache.affinity.Affinity;
import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.ml.math.Matrix;
import org.apache.ignite.ml.math.StorageConstants;
import org.apache.ignite.ml.math.Vector;
import org.apache.ignite.ml.math.distributed.CacheUtils;
import org.apache.ignite.ml.math.distributed.keys.RowColMatrixKey;
import org.apache.ignite.ml.math.exceptions.CardinalityException;
-import org.apache.ignite.ml.math.exceptions.UnsupportedOperationException;
import org.apache.ignite.ml.math.functions.IgniteDoubleFunction;
import org.apache.ignite.ml.math.impls.storage.matrix.SparseDistributedMatrixStorage;
+import org.apache.ignite.ml.math.impls.storage.vector.SparseDistributedVectorStorage;
+import org.apache.ignite.ml.math.impls.vector.SparseDistributedVector;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.UUID;
/**
* Sparse distributed matrix implementation based on data grid.
@@ -68,6 +68,28 @@ public class SparseDistributedMatrix extends AbstractMatrix implements StorageCo
assertStorageMode(stoMode);
setStorage(new SparseDistributedMatrixStorage(rows, cols, stoMode, acsMode));
+
+ }
+
+ /**
+ * @param data Data to fill the matrix
+ */
+ public SparseDistributedMatrix(double[][] data) {
+ assert data.length > 0;
+ setStorage(new SparseDistributedMatrixStorage(data.length, getMaxAmountOfColumns(data), StorageConstants.ROW_STORAGE_MODE, StorageConstants.RANDOM_ACCESS_MODE));
+
+ for (int i = 0; i < data.length; i++)
+ for (int j = 0; j < data[i].length; j++)
+ storage().set(i,j,data[i][j]);
+ }
+
+
+ /**
+ * @param rows Amount of rows in the matrix.
+ * @param cols Amount of columns in the matrix.
+ */
+ public SparseDistributedMatrix(int rows, int cols) {
+ this(rows, cols, StorageConstants.ROW_STORAGE_MODE, StorageConstants.RANDOM_ACCESS_MODE);
}
/** */
@@ -122,7 +144,6 @@ public class SparseDistributedMatrix extends AbstractMatrix implements StorageCo
Ignite ignite = Ignition.localIgnite();
Affinity<RowColMatrixKey> affinity = ignite.affinity(cacheName);
- IgniteCache<RowColMatrixKey, BlockEntry> cache = ignite.getOrCreateCache(cacheName);
ClusterNode locNode = ignite.cluster().localNode();
SparseDistributedMatrixStorage storageC = matrixC.storage();
@@ -141,17 +162,17 @@ public class SparseDistributedMatrix extends AbstractMatrix implements StorageCo
int idx = key.index();
if (isRowMode){
- Vector Aik = matrixA.getCol(idx);
+ Vector Aik = matrixA.getRow(idx);
- for (int i = 0; i < columnSize(); i++) {
- Vector Bkj = matrixB.getRow(i);
+ for (int i = 0; i < matrixB.columnSize(); i++) {
+ Vector Bkj = matrixB.getCol(i);
matrixC.set(idx, i, Aik.times(Bkj).sum());
}
} else {
- Vector Bkj = matrixB.getRow(idx);
+ Vector Bkj = matrixB.getCol(idx);
- for (int i = 0; i < rowSize(); i++) {
- Vector Aik = matrixA.getCol(i);
+ for (int i = 0; i < matrixA.rowSize(); i++) {
+ Vector Aik = matrixA.getRow(i);
matrixC.set(idx, i, Aik.times(Bkj).sum());
}
}
@@ -161,6 +182,49 @@ public class SparseDistributedMatrix extends AbstractMatrix implements StorageCo
return matrixC;
}
+
+ /** {@inheritDoc} */
+ @Override public Vector times(Vector vec) {
+ if (vec == null)
+ throw new IllegalArgumentException("The vector should be not null.");
+
+ if (columnSize() != vec.size())
+ throw new CardinalityException(columnSize(), vec.size());
+
+ SparseDistributedMatrix matrixA = this;
+ SparseDistributedVector vectorB = (SparseDistributedVector) vec;
+
+ String cacheName = storage().cacheName();
+ int rows = this.rowSize();
+
+ SparseDistributedVector vectorC = (SparseDistributedVector) likeVector(rows);
+
+ CacheUtils.bcast(cacheName, () -> {
+ Ignite ignite = Ignition.localIgnite();
+ Affinity<RowColMatrixKey> affinity = ignite.affinity(cacheName);
+
+ ClusterNode locNode = ignite.cluster().localNode();
+
+ SparseDistributedVectorStorage storageC = vectorC.storage();
+
+ Map<ClusterNode, Collection<RowColMatrixKey>> keysCToNodes = affinity.mapKeysToNodes(storageC.getAllKeys());
+ Collection<RowColMatrixKey> locKeys = keysCToNodes.get(locNode);
+
+ if (locKeys == null)
+ return;
+
+ // compute Cij locally on each node
+ // TODO: IGNITE:5114, exec in parallel
+ locKeys.forEach(key -> {
+ int idx = key.index();
+ Vector Aik = matrixA.getRow(idx);
+ vectorC.set(idx, Aik.times(vectorB).sum());
+ });
+ });
+
+ return vectorC;
+ }
+
/** {@inheritDoc} */
@Override public Matrix assign(double val) {
return mapOverValues(v -> val);
@@ -198,17 +262,23 @@ public class SparseDistributedMatrix extends AbstractMatrix implements StorageCo
/** {@inheritDoc} */
@Override public Matrix copy() {
- throw new UnsupportedOperationException();
+ Matrix cp = like(rowSize(), columnSize());
+
+ cp.assign(this);
+
+ return cp;
}
/** {@inheritDoc} */
@Override public Matrix like(int rows, int cols) {
- return new SparseDistributedMatrix(rows, cols, storage().storageMode(), storage().accessMode());
+ if(storage()==null) return new SparseDistributedMatrix(rows, cols);
+ else return new SparseDistributedMatrix(rows, cols, storage().storageMode(), storage().accessMode());
+
}
/** {@inheritDoc} */
@Override public Vector likeVector(int crd) {
- throw new UnsupportedOperationException();
+ return new SparseDistributedVector(crd, StorageConstants.RANDOM_ACCESS_MODE);
}
/** */