You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2015/05/07 18:28:06 UTC

[1/2] incubator-ignite git commit: # IGNITE-289 Create IgniteIterators and IgniteIterable utility classes.

Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-289 [created] 12fa88943


# IGNITE-289 Create IgniteIterators and IgniteIterable utility classes.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/8b61e1c9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/8b61e1c9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/8b61e1c9

Branch: refs/heads/ignite-289
Commit: 8b61e1c963f2dad4ccc08064f46911bf04fb05bd
Parents: 4cf2133
Author: sevdokimov <se...@gridgain.com>
Authored: Thu May 7 19:20:53 2015 +0300
Committer: sevdokimov <se...@gridgain.com>
Committed: Thu May 7 19:20:53 2015 +0300

----------------------------------------------------------------------
 .../ignite/internal/util/IgniteIterables.java   | 107 +++++++++++++++++++
 .../ignite/internal/util/IgniteIterators.java   |  82 ++++++++++++++
 2 files changed, 189 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8b61e1c9/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteIterables.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteIterables.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteIterables.java
new file mode 100644
index 0000000..0c1dc66
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteIterables.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util;
+
+import org.apache.ignite.internal.util.lang.*;
+import org.apache.ignite.lang.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+
+/**
+ *
+ */
+@SuppressWarnings("PublicInnerClass")
+public class IgniteIterables {
+    /**
+     * Private constructor.
+     */
+    private IgniteIterables() {
+        // No-op.
+    }
+
+    /**
+     * @param iterable Input iterable.
+     * @param filter Filter.
+     */
+    public static <T> Iterable<T> filter(@NotNull Iterable<T> iterable, @NotNull IgnitePredicate<T> filter) {
+        return new FilteredIterable<>(iterable, filter);
+    }
+    /**
+     * @param iterable Input iterable.
+     * @param trans Transformation closure.
+     */
+    public static <T1, T2> Iterable<T2> transform(@NotNull Iterable<T1> iterable,
+        @NotNull IgniteClosure<? super T1, T2> trans) {
+        return new TransformedIterable<>(iterable, trans);
+    }
+
+    /**
+     *
+     */
+    public static class TransformedIterable<T1, T2> implements Iterable<T2> {
+        /** */
+        private final Iterable<T1> src;
+
+        /** */
+        private final IgniteClosure<? super T1, T2> trans;
+
+        /**
+         * @param src Source.
+         * @param trans Trans.
+         */
+        public TransformedIterable(Iterable<T1> src, IgniteClosure<? super T1, T2> trans) {
+            this.src = src;
+            this.trans = trans;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Iterator<T2> iterator() {
+            return IgniteIterators.transform(src.iterator(), trans);
+        }
+    }
+
+    /**
+     *
+     */
+    public static class FilteredIterable<T> implements Iterable<T> {
+        /** */
+        private final Iterable<T> src;
+
+        /** */
+        private final IgnitePredicate<T> filter;
+
+        /**
+         * @param src Iterable.
+         * @param filter Filter.
+         */
+        public FilteredIterable(Iterable<T> src, IgnitePredicate<T> filter) {
+            this.src = src;
+            this.filter = filter;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Iterator<T> iterator() {
+            return new GridFilteredIterator<T>(src.iterator()) {
+                @Override protected boolean accept(T t) {
+                    return filter.apply(t);
+                }
+            };
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8b61e1c9/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteIterators.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteIterators.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteIterators.java
new file mode 100644
index 0000000..33a9e43
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteIterators.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util;
+
+import org.apache.ignite.lang.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+
+/**
+ *
+ */
+@SuppressWarnings("PublicInnerClass")
+public class IgniteIterators {
+    /**
+     * Private constructor.
+     */
+    private IgniteIterators() {
+        // No-op.
+    }
+
+    /**
+     * @param itr Itr.
+     * @param trans Trans.
+     */
+    public static <T1, T2> Iterator<T2> transform(@NotNull Iterator<T1> itr,
+        @NotNull IgniteClosure<? super T1, T2> trans) {
+        return new TransformIterator<>(itr, trans);
+    }
+
+    /**
+     *
+     */
+    public static class TransformIterator<T1, T2> implements Iterator<T2> {
+        /** */
+        private final Iterator<T1> src;
+
+        /** */
+        private final IgniteClosure<? super T1, T2> trans;
+
+        /**
+         * @param src Source.
+         * @param trans Trans.
+         */
+        public TransformIterator(Iterator<T1> src, IgniteClosure<? super T1, T2> trans) {
+            this.src = src;
+            this.trans = trans;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean hasNext() {
+            return src.hasNext();
+        }
+
+        /** {@inheritDoc} */
+        @Override public T2 next() {
+            T1 res = src.next();
+
+            return trans.apply(res);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void remove() {
+            src.remove();
+        }
+    }
+}


[2/2] incubator-ignite git commit: # IGNITE-289 (Need to get rid of locPart reference in DHT cache entry) Store local partitions in array.

Posted by se...@apache.org.
# IGNITE-289 (Need to get rid of locPart reference in DHT cache entry) Store local partitions in array.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/12fa8894
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/12fa8894
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/12fa8894

Branch: refs/heads/ignite-289
Commit: 12fa88943e133ad54f7a70b654952482f2f267cf
Parents: 8b61e1c
Author: sevdokimov <se...@gridgain.com>
Authored: Thu May 7 19:27:50 2015 +0300
Committer: sevdokimov <se...@gridgain.com>
Committed: Thu May 7 19:27:50 2015 +0300

----------------------------------------------------------------------
 .../dht/GridDhtPartitionTopology.java           |   2 +-
 .../dht/GridDhtPartitionTopologyImpl.java       |  35 +++--
 .../distributed/near/GridNearCacheAdapter.java  |  64 ++++-----
 .../internal/util/RarefiedConcurrentIntMap.java | 139 +++++++++++++++++++
 .../ignite/internal/util/lang/GridFunc.java     |  19 +++
 5 files changed, 214 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/12fa8894/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
index c551fb3..bad5efb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
@@ -121,7 +121,7 @@ public interface GridDhtPartitionTopology {
      *
      * @return All current local partitions.
      */
-    public Collection<GridDhtLocalPartition> currentLocalPartitions();
+    public Iterable<GridDhtLocalPartition> currentLocalPartitions();
 
     /**
      * @return Local IDs.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/12fa8894/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 073e0e7..e2f77de 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -28,10 +28,8 @@ import org.apache.ignite.internal.util.tostring.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.jetbrains.annotations.*;
-import org.jsr166.*;
 
 import java.util.*;
-import java.util.concurrent.*;
 import java.util.concurrent.locks.*;
 
 import static org.apache.ignite.events.EventType.*;
@@ -55,8 +53,7 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology {
     private final IgniteLogger log;
 
     /** */
-    private final ConcurrentMap<Integer, GridDhtLocalPartition> locParts =
-        new ConcurrentHashMap8<>();
+    private final RarefiedConcurrentIntMap<GridDhtLocalPartition> locParts;
 
     /** Node to partition map. */
     private GridDhtPartitionFullMap node2part;
@@ -90,6 +87,8 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology {
 
         this.cctx = cctx;
 
+        locParts = new RarefiedConcurrentIntMap<>(cctx.affinity().partitions());
+
         log = cctx.logger(getClass());
     }
 
@@ -120,7 +119,7 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology {
         boolean changed = false;
 
         // Synchronously wait for all renting partitions to complete.
-        for (Iterator<GridDhtLocalPartition> it = locParts.values().iterator(); it.hasNext();) {
+        for (Iterator<GridDhtLocalPartition> it = locParts.iterator(); it.hasNext();) {
             GridDhtLocalPartition p = it.next();
 
             GridDhtPartitionState state = p.state();
@@ -557,12 +556,16 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology {
 
     /** {@inheritDoc} */
     @Override public List<GridDhtLocalPartition> localPartitions() {
-        return new LinkedList<>(locParts.values());
+        List<GridDhtLocalPartition> res = new LinkedList<>();
+
+        locParts.addAllTo(res);
+
+        return res;
     }
 
     /** {@inheritDoc} */
-    @Override public Collection<GridDhtLocalPartition> currentLocalPartitions() {
-        return locParts.values();
+    @Override public Iterable<GridDhtLocalPartition> currentLocalPartitions() {
+        return locParts;
     }
 
     /** {@inheritDoc} */
@@ -603,8 +606,16 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology {
         lock.readLock().lock();
 
         try {
-            return new GridDhtPartitionMap(cctx.nodeId(), updateSeq.get(),
-                F.viewReadOnly(locParts, CU.<K, V>part2state()), true);
+            GridDhtPartitionMap res = new GridDhtPartitionMap(cctx.nodeId(), updateSeq.get());
+
+            for (int i = 0; i < locParts.maxIndex(); i++) {
+                GridDhtLocalPartition part = locParts.get(i);
+
+                if (part.state().active())
+                    res.put(i, part.state());
+            }
+
+            return res;
         }
         finally {
             lock.readLock().unlock();
@@ -950,7 +961,7 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology {
 
         UUID locId = cctx.nodeId();
 
-        for (GridDhtLocalPartition part : locParts.values()) {
+        for (GridDhtLocalPartition part : locParts) {
             GridDhtPartitionState state = part.state();
 
             if (state.active()) {
@@ -1172,7 +1183,7 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology {
     @Override public void printMemoryStats(int threshold) {
         X.println(">>>  Cache partition topology stats [grid=" + cctx.gridName() + ", cache=" + cctx.name() + ']');
 
-        for (GridDhtLocalPartition part : locParts.values()) {
+        for (GridDhtLocalPartition part : locParts) {
             int size = part.size();
 
             if (size >= threshold)

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/12fa8894/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
index 29c1d45..339649e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
@@ -26,6 +26,7 @@ import org.apache.ignite.internal.processors.cache.distributed.*;
 import org.apache.ignite.internal.processors.cache.distributed.dht.*;
 import org.apache.ignite.internal.processors.cache.transactions.*;
 import org.apache.ignite.internal.processors.cache.version.*;
+import org.apache.ignite.internal.util.*;
 import org.apache.ignite.internal.util.future.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
@@ -329,42 +330,41 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda
         @Nullable final CacheEntryPredicate... filter) {
         final AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion();
 
-        Collection<Cache.Entry<K, V>> entries =
-            F.flatCollections(
-                F.viewReadOnly(
-                    dht().topology().currentLocalPartitions(),
-                    new C1<GridDhtLocalPartition, Collection<Cache.Entry<K, V>>>() {
-                        @Override public Collection<Cache.Entry<K, V>> apply(GridDhtLocalPartition p) {
-                            Collection<GridDhtCacheEntry> entries0 = p.entries();
-
-                            if (!F.isEmpty(filter))
-                                entries0 = F.view(entries0, new CacheEntryPredicateAdapter() {
-                                    @Override public boolean apply(GridCacheEntryEx e) {
-                                        return F.isAll(e, filter);
-                                    }
-                                });
-
-                            return F.viewReadOnly(
-                                entries0,
-                                new C1<GridDhtCacheEntry, Cache.Entry<K, V>>() {
-                                    @Override public Cache.Entry<K, V> apply(GridDhtCacheEntry e) {
-                                        return e.wrapLazyValue();
-                                    }
-                                },
-                                new P1<GridDhtCacheEntry>() {
-                                    @Override public boolean apply(GridDhtCacheEntry e) {
-                                        return !e.obsoleteOrDeleted();
-                                    }
-                                });
+        Iterable<GridDhtLocalPartition> primaryOnly = IgniteIterables.filter(dht().topology().currentLocalPartitions(),
+            new P1<GridDhtLocalPartition>() {
+            @Override public boolean apply(GridDhtLocalPartition p) {
+                return p.primary(topVer);
+            }
+        });
+
+        Iterable<Collection<Cache.Entry<K, V>>> entriesCol = IgniteIterables.transform(primaryOnly,
+            new C1<GridDhtLocalPartition, Collection<Cache.Entry<K, V>>>() {
+            @Override public Collection<Cache.Entry<K, V>> apply(GridDhtLocalPartition p) {
+                Collection<GridDhtCacheEntry> entries0 = p.entries();
+
+                if (!F.isEmpty(filter))
+                    entries0 = F.view(entries0, new CacheEntryPredicateAdapter() {
+                        @Override public boolean apply(GridCacheEntryEx e) {
+                            return F.isAll(e, filter);
+                        }
+                    });
+
+                return F.viewReadOnly(
+                    entries0,
+                    new C1<GridDhtCacheEntry, Cache.Entry<K, V>>() {
+                        @Override public Cache.Entry<K, V> apply(GridDhtCacheEntry e) {
+                            return e.wrapLazyValue();
                         }
                     },
-                    new P1<GridDhtLocalPartition>() {
-                        @Override public boolean apply(GridDhtLocalPartition p) {
-                            return p.primary(topVer);
+                    new P1<GridDhtCacheEntry>() {
+                        @Override public boolean apply(GridDhtCacheEntry e) {
+                            return !e.obsoleteOrDeleted();
                         }
-                    }));
+                    });
+            }
+        });
 
-        return new GridCacheEntrySet<>(ctx, entries, null);
+        return new GridCacheEntrySet<>(ctx, F.flatCollections(entriesCol), null);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/12fa8894/modules/core/src/main/java/org/apache/ignite/internal/util/RarefiedConcurrentIntMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/RarefiedConcurrentIntMap.java b/modules/core/src/main/java/org/apache/ignite/internal/util/RarefiedConcurrentIntMap.java
new file mode 100644
index 0000000..0e7c0a1
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/RarefiedConcurrentIntMap.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util;
+
+import java.util.*;
+import java.util.concurrent.atomic.*;
+
+/**
+ *
+ */
+public class RarefiedConcurrentIntMap<T> implements Iterable<T> {
+    /** */
+    private final AtomicReferenceArray<T> arr;
+
+    /** */
+    private final int maxIdx;
+
+    /**
+     * @param maxIdx Max element index.
+     */
+    public RarefiedConcurrentIntMap(int maxIdx) {
+        arr = new AtomicReferenceArray<T>(maxIdx);
+
+        this.maxIdx = maxIdx;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Iterator<T> iterator() {
+        return new Iterator<T>() {
+
+            private int idx;
+
+            private T next;
+
+            private T lastReturned;
+
+            private int lastReturnedIdx;
+
+            private void advance() {
+                while (next == null && idx < maxIdx)
+                    next = arr.get(idx++);
+            }
+
+            @Override public boolean hasNext() {
+                advance();
+
+                return next != null;
+            }
+
+            @Override public T next() {
+                advance();
+
+                if (next == null)
+                    throw new NoSuchElementException();
+
+                lastReturned = next;
+                lastReturnedIdx = idx - 1;
+
+                next = null;
+
+                return lastReturned;
+            }
+
+            @Override public void remove() {
+                if (lastReturned == null)
+                    throw new IllegalStateException();
+
+                arr.compareAndSet(lastReturnedIdx, lastReturned, null);
+
+                lastReturned = null;
+            }
+        };
+    }
+
+    /**
+     * @param idx Index.
+     */
+    public T get(int idx) {
+        return arr.get(idx);
+    }
+
+    /**
+     *
+     */
+    public int maxIndex() {
+        return maxIdx;
+    }
+
+    /**
+     * @param idx Index.
+     * @param expVal Expected value.
+     */
+    public boolean remove(int idx, T expVal) {
+        return arr.compareAndSet(idx, expVal, null);
+    }
+
+    /**
+     * @param idx Index.
+     * @param val Value.
+     */
+    public T putIfAbsent(int idx, T val) {
+        while (true) {
+            if (arr.compareAndSet(idx, null, val))
+                return null;
+
+            T res = arr.get(idx);
+
+            if (res != null)
+                return res;
+        }
+    }
+
+    /**
+     * @param c Closure.
+     */
+    public void addAllTo(Collection<? super T> c) {
+        for (int i = 0; i < maxIdx; i++) {
+            T e = arr.get(i);
+
+            if (e != null)
+                c.add(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/12fa8894/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
index c86c5a4..c6137b1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
@@ -2884,6 +2884,25 @@ public class GridFunc {
         if (F.isEmpty(c))
             return Collections.emptyList();
 
+        return flatCollections((Iterable<? extends Collection<T>>)c);
+    }
+
+    /**
+     * Flattens collection-of-collections and returns collection over the
+     * elements of the inner collections. This method doesn't create any
+     * new collections or copies any elements.
+     * <p>
+     * Note that due to non-copying nature of implementation, the
+     * {@link Collection#size() size()} method of resulting collection will have to
+     * iterate over all elements to produce size. Method {@link Collection#isEmpty() isEmpty()},
+     * however, is constant time and is much more preferable to use instead
+     * of {@code 'size()'} method when checking if list is not empty.
+     *
+     * @param c Input collection of collections.
+     * @param <T> Type of the inner collections.
+     * @return Iterable over the elements of the inner collections.
+     */
+    public static <T> Collection<T> flatCollections(@Nullable final Iterable<? extends Collection<T>> c) {
         return new GridSerializableCollection<T>() {
             @NotNull
             @Override public Iterator<T> iterator() {