You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ak...@apache.org on 2016/06/14 02:54:08 UTC

[3/4] ignite git commit: Refactor collecting partitions in VisorCache into separate task VisorCachePartitionsTask.

Refactor collecting partitions in VisorCache into separate task VisorCachePartitionsTask.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/fad73bf5
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/fad73bf5
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/fad73bf5

Branch: refs/heads/master
Commit: fad73bf570760e697e20ea19c00c18bc5d6cfc6e
Parents: 96c599c
Author: Alexey Kuznetsov <ak...@apache.org>
Authored: Tue Jun 14 09:37:29 2016 +0700
Committer: Alexey Kuznetsov <ak...@apache.org>
Committed: Tue Jun 14 09:37:29 2016 +0700

----------------------------------------------------------------------
 .../ignite/internal/visor/cache/VisorCache.java |  56 +------
 .../visor/cache/VisorCachePartition.java        |  89 +++++++++++
 .../visor/cache/VisorCachePartitions.java       |  88 +++++++++++
 .../visor/cache/VisorCachePartitionsTask.java   | 152 +++++++++++++++++++
 .../internal/visor/cache/VisorCacheV3.java      |  68 +--------
 5 files changed, 339 insertions(+), 114 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/fad73bf5/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCache.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCache.java
index 1be7af8..f06813f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCache.java
@@ -18,24 +18,18 @@
 package org.apache.ignite.internal.visor.cache;
 
 import java.io.Serializable;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
-import java.util.List;
 import java.util.Set;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cache.CacheMode;
-import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.internal.IgniteEx;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap2;
@@ -97,10 +91,10 @@ public class VisorCache implements Serializable {
     /** Number of partitions. */
     private int partitions;
 
-    /** Primary partitions IDs with sizes. */
+    /** @deprecated Needed only for backward compatibility. */
     private Collection<IgnitePair<Integer>> primaryPartitions;
 
-    /** Backup partitions IDs with sizes. */
+    /** @deprecated Needed only for backward compatibility. */
     private Collection<IgnitePair<Integer>> backupPartitions;
 
     /** Cache metrics. */
@@ -162,53 +156,11 @@ public class VisorCache implements Serializable {
 
                     partitionsMap = new GridDhtPartitionMap(map2.nodeId(), map2.updateSequence(), map2.map());
                 }
-
-                List<GridDhtLocalPartition> parts = top.localPartitions();
-
-                primaryPartitions = new ArrayList<>(parts.size());
-                backupPartitions = new ArrayList<>(parts.size());
-
-                for (GridDhtLocalPartition part : parts) {
-                    int p = part.id();
-
-                    int sz = part.size();
-
-                    // Pass -1 as topology version in order not to wait for topology version.
-                    if (part.primary(AffinityTopologyVersion.NONE))
-                        primaryPartitions.add(new IgnitePair<>(p, sz));
-                    else if (part.state() == GridDhtPartitionState.OWNING && part.backup(AffinityTopologyVersion.NONE))
-                        backupPartitions.add(new IgnitePair<>(p, sz));
-                }
-            }
-            else {
-                // Old way of collecting partitions info.
-                ClusterNode node = ignite.cluster().localNode();
-
-                int[] pp = ca.affinity().primaryPartitions(node);
-
-                primaryPartitions= new ArrayList<>(pp.length);
-
-                for (int p : pp) {
-                    Set set = ca.entrySet(p);
-
-                    primaryPartitions.add(new IgnitePair<>(p, set != null ? set.size() : 0));
-                }
-
-                int[] bp = ca.affinity().backupPartitions(node);
-
-                backupPartitions = new ArrayList<>(bp.length);
-
-                for (int p : bp) {
-                    Set set = ca.entrySet(p);
-
-                    backupPartitions.add(new IgnitePair<>(p, set != null ? set.size() : 0));
-                }
             }
         }
 
         size = ca.size();
         nearSize = ca.nearSize();
-
         dynamicDeploymentId = ca.context().dynamicDeploymentId();
         dhtSize = size - nearSize;
         primarySize = ca.primarySize();
@@ -401,14 +353,14 @@ public class VisorCache implements Serializable {
     }
 
     /**
-     * @return Primary partitions IDs with sizes.
+     * @deprecated Needed only for backward compatibility.
      */
     public Collection<IgnitePair<Integer>> primaryPartitions() {
         return primaryPartitions;
     }
 
     /**
-     * @return Backup partitions IDs with sizes.
+     * @deprecated Needed only for backward compatibility.
      */
     public Collection<IgnitePair<Integer>> backupPartitions() {
         return backupPartitions;

http://git-wip-us.apache.org/repos/asf/ignite/blob/fad73bf5/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCachePartition.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCachePartition.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCachePartition.java
new file mode 100644
index 0000000..5909c1a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCachePartition.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.visor.cache;
+
+import java.io.Serializable;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * Data transfer object for information about keys in cache partition.
+ */
+public class VisorCachePartition implements Serializable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private int part;
+
+    /** */
+    private int heap;
+
+    /** */
+    private long offheap;
+
+    /** */
+    private long swap;
+
+    /**
+     * Full constructor.
+     *
+     * @param part Partition id.
+     * @param heap Number of keys in heap.
+     * @param offheap Number of keys in offheap.
+     * @param swap Number of keys in swap.
+     */
+    public VisorCachePartition(int part, int heap, long offheap, long swap) {
+        this.part = part;
+        this.heap = heap;
+        this.offheap = offheap;
+        this.swap = swap;
+    }
+
+    /**
+     * @return Partition id.
+     */
+    public int partition() {
+        return part;
+    }
+
+    /**
+     * @return Number of keys in heap.
+     */
+    public int heap() {
+        return heap;
+    }
+
+    /**
+     * @return Number of keys in offheap.
+     */
+    public long offheap() {
+        return offheap;
+    }
+
+    /**
+     * @return Number of keys in swap.
+     */
+    public long swap() {
+        return swap;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(VisorCachePartition.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/fad73bf5/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCachePartitions.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCachePartitions.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCachePartitions.java
new file mode 100644
index 0000000..4634fa6
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCachePartitions.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.visor.cache;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * Data transfer object for information about cache partitions.
+ */
+public class VisorCachePartitions implements Serializable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private List<VisorCachePartition> primary;
+
+    /** */
+    private List<VisorCachePartition> backup;
+
+    /**
+     * Default constructor.
+     */
+    public VisorCachePartitions() {
+        primary = new ArrayList<>();
+        backup = new ArrayList<>();
+    }
+
+    /**
+     * Add primary partition descriptor.
+     *
+     * @param part Partition id.
+     * @param heap Number of primary keys in heap.
+     * @param offheap Number of primary keys in offheap.
+     * @param swap Number of primary keys in swap.
+     */
+    public void addPrimary(int part, int heap, long offheap, long swap) {
+       primary.add(new VisorCachePartition(part, heap, offheap, swap));
+    }
+
+    /**
+     * Add backup partition descriptor.
+     *
+     * @param part Partition id.
+     * @param heap Number of backup keys in heap.
+     * @param offheap Number of backup keys in offheap.
+     * @param swap Number of backup keys in swap.
+     */
+    public void addBackup(int part, int heap, long offheap, long swap) {
+       backup.add(new VisorCachePartition(part, heap, offheap, swap));
+    }
+
+    /**
+     * @return Get list of primary partitions.
+     */
+    public List<VisorCachePartition> primary() {
+        return primary;
+    }
+
+    /**
+     * @return Get list of backup partitions.
+     */
+    public List<VisorCachePartition> backup() {
+        return backup;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(VisorCachePartitions.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/fad73bf5/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCachePartitionsTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCachePartitionsTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCachePartitionsTask.java
new file mode 100644
index 0000000..80836c6
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCachePartitionsTask.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.visor.cache;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.compute.ComputeJobResult;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
+import org.apache.ignite.internal.processors.cache.GridCacheSwapManager;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter;
+import org.apache.ignite.internal.processors.task.GridInternal;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.visor.VisorJob;
+import org.apache.ignite.internal.visor.VisorMultiNodeTask;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.internal.visor.util.VisorTaskUtils.log;
+import static org.apache.ignite.internal.visor.util.VisorTaskUtils.escapeName;
+
+/**
+ * Task that collect keys distribution in partitions.
+ */
+@GridInternal
+public class VisorCachePartitionsTask extends VisorMultiNodeTask<String, Map<UUID, VisorCachePartitions>, VisorCachePartitions> {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** {@inheritDoc} */
+    @Override protected VisorCachePartitionsJob job(String arg) {
+        return new VisorCachePartitionsJob(arg, debug);
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override protected Map<UUID, VisorCachePartitions> reduce0(List<ComputeJobResult> results) {
+        Map<UUID, VisorCachePartitions> parts = new HashMap<>();
+
+        for (ComputeJobResult res : results) {
+            if (res.getException() != null)
+                throw res.getException();
+
+            parts.put(res.getNode().id(), (VisorCachePartitions)res.getData());
+        }
+
+        return parts;
+    }
+
+    /**
+     * Job that collect cache metrics from node.
+     */
+    private static class VisorCachePartitionsJob extends VisorJob<String, VisorCachePartitions> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /**
+         * Create job with given argument.
+         *
+         * @param cacheName Cache name.
+         * @param debug Debug flag.
+         */
+        private VisorCachePartitionsJob(String cacheName, boolean debug) {
+            super(cacheName, debug);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected VisorCachePartitions run(final String cacheName) throws IgniteException {
+            if (debug)
+                log(ignite.log(), "Collecting partitions for cache: " + escapeName(cacheName));
+
+            VisorCachePartitions parts = new VisorCachePartitions();
+
+            GridCacheAdapter ca = ignite.context().cache().internalCache(cacheName);
+
+            // Cache was not started.
+            if (ca == null || !ca.context().started())
+                return parts;
+
+            CacheConfiguration cfg = ca.configuration();
+
+            CacheMode mode = cfg.getCacheMode();
+
+            boolean partitioned = (mode == CacheMode.PARTITIONED || mode == CacheMode.REPLICATED)
+                && ca.context().affinityNode();
+
+            if (partitioned) {
+                GridCacheSwapManager swap = ca.context().swap();
+
+                GridDhtCacheAdapter dca = null;
+
+                if (ca instanceof GridNearCacheAdapter)
+                    dca = ((GridNearCacheAdapter)ca).dht();
+                else if (ca instanceof GridDhtCacheAdapter)
+                    dca = (GridDhtCacheAdapter)ca;
+
+                if (dca != null) {
+                    GridDhtPartitionTopology top = dca.topology();
+
+                    List<GridDhtLocalPartition> locParts = top.localPartitions();
+
+                    try {
+                        for (GridDhtLocalPartition part : locParts) {
+                            int p = part.id();
+
+                            int sz = part.size();
+
+                            // Pass -1 as topology version in order not to wait for topology version.
+                            if (part.primary(AffinityTopologyVersion.NONE))
+                                parts.addPrimary(p, sz, swap.offheapEntriesCount(p), swap.swapEntriesCount(p));
+                            else if (part.state() == GridDhtPartitionState.OWNING && part.backup(AffinityTopologyVersion.NONE))
+                                parts.addBackup(p, sz, swap.offheapEntriesCount(p), swap.swapEntriesCount(p));
+                        }
+                    }
+                    catch (IgniteCheckedException e) {
+                        throw new IgniteException("Failed to collect keys distribution in partitions", e);
+                    }
+                }
+            }
+
+            return parts;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(VisorCachePartitionsJob.class, this);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/fad73bf5/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheV3.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheV3.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheV3.java
index bd9a3ce..fab37e3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheV3.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheV3.java
@@ -17,90 +17,34 @@
 
 package org.apache.ignite.internal.visor.cache;
 
-import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import org.apache.ignite.IgniteCache;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.IgniteEx;
-import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
-import org.apache.ignite.internal.processors.cache.GridCacheSwapManager;
 import org.apache.ignite.internal.util.lang.GridTuple3;
-import org.apache.ignite.internal.util.lang.IgnitePair;
 
 /**
  * Data transfer object for {@link IgniteCache}.
+ *
+ * @deprecated Needed only for backward compatibility.
  */
 public class VisorCacheV3 extends VisorCacheV2 {
     /** */
     private static final long serialVersionUID = 0L;
 
-    /** Primary partitions IDs with offheap and swap entries count. */
+    /** @deprecated Needed only for backward compatibility. */
     private Collection<GridTuple3<Integer, Long, Long>> primaryPartsOffheapSwap;
 
-    /** Backup partitions IDs with offheap and swap entries count. */
+    /** @deprecated Needed only for backward compatibility. */
     private Collection<GridTuple3<Integer, Long, Long>> backupPartsOffheapSwap;
 
-    /** {@inheritDoc} */
-    @Override public VisorCache from(IgniteEx ignite, String cacheName, int sample) throws IgniteCheckedException {
-        VisorCache c = super.from(ignite, cacheName, sample);
-
-        if (c != null && c instanceof VisorCacheV3) {
-            VisorCacheV3 cacheV3 = (VisorCacheV3)c;
-
-            GridCacheAdapter ca = ignite.context().cache().internalCache(cacheName);
-
-            // Process only started caches.
-            if (ca != null && ca.context().started()) {
-                GridCacheSwapManager swap = ca.context().swap();
-
-                cacheV3.primaryPartsOffheapSwap = new ArrayList<>(c.primaryPartitions().size());
-
-                for (IgnitePair<Integer> part: c.primaryPartitions()) {
-                    int p = part.get1();
-
-                    cacheV3.primaryPartsOffheapSwap.add(new GridTuple3<>(p, swap.offheapEntriesCount(p), swap.swapEntriesCount(p)));
-                }
-
-                cacheV3.backupPartsOffheapSwap = new ArrayList<>(c.backupPartitions().size());
-
-                for (IgnitePair<Integer> part: c.backupPartitions()) {
-                    int p = part.get1();
-
-                    cacheV3.backupPartsOffheapSwap.add(new GridTuple3<>(p, swap.offheapEntriesCount(p), swap.swapEntriesCount(p)));
-                }
-            }
-        }
-
-        return c;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected VisorCache initHistory(VisorCache c) {
-        super.initHistory(c);
-
-        if (c instanceof VisorCacheV3) {
-            ((VisorCacheV3)c).primaryPartsOffheapSwap = Collections.emptyList();
-            ((VisorCacheV3)c).backupPartsOffheapSwap = Collections.emptyList();
-        }
-
-        return c;
-    }
-
-    /** {@inheritDoc} */
-    @Override public VisorCache history() {
-        return initHistory(new VisorCacheV3());
-    }
-
     /**
-     * @return Collection with primary partitions IDs and offheap and swap entries count.
+     * @deprecated Needed only for backward compatibility.
      */
     public Collection<GridTuple3<Integer, Long, Long>> primaryPartitionsOffheapSwap() {
         return primaryPartsOffheapSwap;
     }
 
     /**
-     * @return Collection with backup partitions IDs and offheap and swap entries count.
+     * @deprecated Needed only for backward compatibility.
      */
     public Collection<GridTuple3<Integer, Long, Long>> backupPartitionsOffheapSwap() {
         return backupPartsOffheapSwap;