You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ak...@apache.org on 2016/06/14 02:54:08 UTC
[3/4] ignite git commit: Refactor collecting partitions in VisorCache
into separate task VisorCachePartitionsTask.
Refactor collecting partitions in VisorCache into separate task VisorCachePartitionsTask.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/fad73bf5
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/fad73bf5
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/fad73bf5
Branch: refs/heads/master
Commit: fad73bf570760e697e20ea19c00c18bc5d6cfc6e
Parents: 96c599c
Author: Alexey Kuznetsov <ak...@apache.org>
Authored: Tue Jun 14 09:37:29 2016 +0700
Committer: Alexey Kuznetsov <ak...@apache.org>
Committed: Tue Jun 14 09:37:29 2016 +0700
----------------------------------------------------------------------
.../ignite/internal/visor/cache/VisorCache.java | 56 +------
.../visor/cache/VisorCachePartition.java | 89 +++++++++++
.../visor/cache/VisorCachePartitions.java | 88 +++++++++++
.../visor/cache/VisorCachePartitionsTask.java | 152 +++++++++++++++++++
.../internal/visor/cache/VisorCacheV3.java | 68 +--------
5 files changed, 339 insertions(+), 114 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/fad73bf5/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCache.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCache.java
index 1be7af8..f06813f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCache.java
@@ -18,24 +18,18 @@
package org.apache.ignite.internal.visor.cache;
import java.io.Serializable;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
-import java.util.List;
import java.util.Set;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cache.CacheMode;
-import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.IgniteEx;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap2;
@@ -97,10 +91,10 @@ public class VisorCache implements Serializable {
/** Number of partitions. */
private int partitions;
- /** Primary partitions IDs with sizes. */
+ /** @deprecated Needed only for backward compatibility. */
private Collection<IgnitePair<Integer>> primaryPartitions;
- /** Backup partitions IDs with sizes. */
+ /** @deprecated Needed only for backward compatibility. */
private Collection<IgnitePair<Integer>> backupPartitions;
/** Cache metrics. */
@@ -162,53 +156,11 @@ public class VisorCache implements Serializable {
partitionsMap = new GridDhtPartitionMap(map2.nodeId(), map2.updateSequence(), map2.map());
}
-
- List<GridDhtLocalPartition> parts = top.localPartitions();
-
- primaryPartitions = new ArrayList<>(parts.size());
- backupPartitions = new ArrayList<>(parts.size());
-
- for (GridDhtLocalPartition part : parts) {
- int p = part.id();
-
- int sz = part.size();
-
- // Pass -1 as topology version in order not to wait for topology version.
- if (part.primary(AffinityTopologyVersion.NONE))
- primaryPartitions.add(new IgnitePair<>(p, sz));
- else if (part.state() == GridDhtPartitionState.OWNING && part.backup(AffinityTopologyVersion.NONE))
- backupPartitions.add(new IgnitePair<>(p, sz));
- }
- }
- else {
- // Old way of collecting partitions info.
- ClusterNode node = ignite.cluster().localNode();
-
- int[] pp = ca.affinity().primaryPartitions(node);
-
- primaryPartitions= new ArrayList<>(pp.length);
-
- for (int p : pp) {
- Set set = ca.entrySet(p);
-
- primaryPartitions.add(new IgnitePair<>(p, set != null ? set.size() : 0));
- }
-
- int[] bp = ca.affinity().backupPartitions(node);
-
- backupPartitions = new ArrayList<>(bp.length);
-
- for (int p : bp) {
- Set set = ca.entrySet(p);
-
- backupPartitions.add(new IgnitePair<>(p, set != null ? set.size() : 0));
- }
}
}
size = ca.size();
nearSize = ca.nearSize();
-
dynamicDeploymentId = ca.context().dynamicDeploymentId();
dhtSize = size - nearSize;
primarySize = ca.primarySize();
@@ -401,14 +353,14 @@ public class VisorCache implements Serializable {
}
/**
- * @return Primary partitions IDs with sizes.
+ * @deprecated Needed only for backward compatibility.
*/
public Collection<IgnitePair<Integer>> primaryPartitions() {
return primaryPartitions;
}
/**
- * @return Backup partitions IDs with sizes.
+ * @deprecated Needed only for backward compatibility.
*/
public Collection<IgnitePair<Integer>> backupPartitions() {
return backupPartitions;
http://git-wip-us.apache.org/repos/asf/ignite/blob/fad73bf5/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCachePartition.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCachePartition.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCachePartition.java
new file mode 100644
index 0000000..5909c1a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCachePartition.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.visor.cache;
+
+import java.io.Serializable;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * Data transfer object for information about keys in cache partition.
+ */
+public class VisorCachePartition implements Serializable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private int part;
+
+ /** */
+ private int heap;
+
+ /** */
+ private long offheap;
+
+ /** */
+ private long swap;
+
+ /**
+ * Full constructor.
+ *
+ * @param part Partition id.
+ * @param heap Number of keys in heap.
+ * @param offheap Number of keys in offheap.
+ * @param swap Number of keys in swap.
+ */
+ public VisorCachePartition(int part, int heap, long offheap, long swap) {
+ this.part = part;
+ this.heap = heap;
+ this.offheap = offheap;
+ this.swap = swap;
+ }
+
+ /**
+ * @return Partition id.
+ */
+ public int partition() {
+ return part;
+ }
+
+ /**
+ * @return Number of keys in heap.
+ */
+ public int heap() {
+ return heap;
+ }
+
+ /**
+ * @return Number of keys in offheap.
+ */
+ public long offheap() {
+ return offheap;
+ }
+
+ /**
+ * @return Number of keys in swap.
+ */
+ public long swap() {
+ return swap;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(VisorCachePartition.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/fad73bf5/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCachePartitions.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCachePartitions.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCachePartitions.java
new file mode 100644
index 0000000..4634fa6
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCachePartitions.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.visor.cache;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * Data transfer object for information about cache partitions.
+ */
+public class VisorCachePartitions implements Serializable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private List<VisorCachePartition> primary;
+
+ /** */
+ private List<VisorCachePartition> backup;
+
+ /**
+ * Default constructor.
+ */
+ public VisorCachePartitions() {
+ primary = new ArrayList<>();
+ backup = new ArrayList<>();
+ }
+
+ /**
+ * Add primary partition descriptor.
+ *
+ * @param part Partition id.
+ * @param heap Number of primary keys in heap.
+ * @param offheap Number of primary keys in offheap.
+ * @param swap Number of primary keys in swap.
+ */
+ public void addPrimary(int part, int heap, long offheap, long swap) {
+ primary.add(new VisorCachePartition(part, heap, offheap, swap));
+ }
+
+ /**
+ * Add backup partition descriptor.
+ *
+ * @param part Partition id.
+ * @param heap Number of backup keys in heap.
+ * @param offheap Number of backup keys in offheap.
+ * @param swap Number of backup keys in swap.
+ */
+ public void addBackup(int part, int heap, long offheap, long swap) {
+ backup.add(new VisorCachePartition(part, heap, offheap, swap));
+ }
+
+ /**
+ * @return Get list of primary partitions.
+ */
+ public List<VisorCachePartition> primary() {
+ return primary;
+ }
+
+ /**
+ * @return Get list of backup partitions.
+ */
+ public List<VisorCachePartition> backup() {
+ return backup;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(VisorCachePartitions.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/fad73bf5/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCachePartitionsTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCachePartitionsTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCachePartitionsTask.java
new file mode 100644
index 0000000..80836c6
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCachePartitionsTask.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.visor.cache;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.compute.ComputeJobResult;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
+import org.apache.ignite.internal.processors.cache.GridCacheSwapManager;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter;
+import org.apache.ignite.internal.processors.task.GridInternal;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.visor.VisorJob;
+import org.apache.ignite.internal.visor.VisorMultiNodeTask;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.internal.visor.util.VisorTaskUtils.log;
+import static org.apache.ignite.internal.visor.util.VisorTaskUtils.escapeName;
+
+/**
+ * Task that collect keys distribution in partitions.
+ */
+@GridInternal
+public class VisorCachePartitionsTask extends VisorMultiNodeTask<String, Map<UUID, VisorCachePartitions>, VisorCachePartitions> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** {@inheritDoc} */
+ @Override protected VisorCachePartitionsJob job(String arg) {
+ return new VisorCachePartitionsJob(arg, debug);
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override protected Map<UUID, VisorCachePartitions> reduce0(List<ComputeJobResult> results) {
+ Map<UUID, VisorCachePartitions> parts = new HashMap<>();
+
+ for (ComputeJobResult res : results) {
+ if (res.getException() != null)
+ throw res.getException();
+
+ parts.put(res.getNode().id(), (VisorCachePartitions)res.getData());
+ }
+
+ return parts;
+ }
+
+ /**
+ * Job that collect cache metrics from node.
+ */
+ private static class VisorCachePartitionsJob extends VisorJob<String, VisorCachePartitions> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /**
+ * Create job with given argument.
+ *
+ * @param cacheName Cache name.
+ * @param debug Debug flag.
+ */
+ private VisorCachePartitionsJob(String cacheName, boolean debug) {
+ super(cacheName, debug);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected VisorCachePartitions run(final String cacheName) throws IgniteException {
+ if (debug)
+ log(ignite.log(), "Collecting partitions for cache: " + escapeName(cacheName));
+
+ VisorCachePartitions parts = new VisorCachePartitions();
+
+ GridCacheAdapter ca = ignite.context().cache().internalCache(cacheName);
+
+ // Cache was not started.
+ if (ca == null || !ca.context().started())
+ return parts;
+
+ CacheConfiguration cfg = ca.configuration();
+
+ CacheMode mode = cfg.getCacheMode();
+
+ boolean partitioned = (mode == CacheMode.PARTITIONED || mode == CacheMode.REPLICATED)
+ && ca.context().affinityNode();
+
+ if (partitioned) {
+ GridCacheSwapManager swap = ca.context().swap();
+
+ GridDhtCacheAdapter dca = null;
+
+ if (ca instanceof GridNearCacheAdapter)
+ dca = ((GridNearCacheAdapter)ca).dht();
+ else if (ca instanceof GridDhtCacheAdapter)
+ dca = (GridDhtCacheAdapter)ca;
+
+ if (dca != null) {
+ GridDhtPartitionTopology top = dca.topology();
+
+ List<GridDhtLocalPartition> locParts = top.localPartitions();
+
+ try {
+ for (GridDhtLocalPartition part : locParts) {
+ int p = part.id();
+
+ int sz = part.size();
+
+ // Pass -1 as topology version in order not to wait for topology version.
+ if (part.primary(AffinityTopologyVersion.NONE))
+ parts.addPrimary(p, sz, swap.offheapEntriesCount(p), swap.swapEntriesCount(p));
+ else if (part.state() == GridDhtPartitionState.OWNING && part.backup(AffinityTopologyVersion.NONE))
+ parts.addBackup(p, sz, swap.offheapEntriesCount(p), swap.swapEntriesCount(p));
+ }
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException("Failed to collect keys distribution in partitions", e);
+ }
+ }
+ }
+
+ return parts;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(VisorCachePartitionsJob.class, this);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/fad73bf5/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheV3.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheV3.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheV3.java
index bd9a3ce..fab37e3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheV3.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheV3.java
@@ -17,90 +17,34 @@
package org.apache.ignite.internal.visor.cache;
-import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import org.apache.ignite.IgniteCache;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.IgniteEx;
-import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
-import org.apache.ignite.internal.processors.cache.GridCacheSwapManager;
import org.apache.ignite.internal.util.lang.GridTuple3;
-import org.apache.ignite.internal.util.lang.IgnitePair;
/**
* Data transfer object for {@link IgniteCache}.
+ *
+ * @deprecated Needed only for backward compatibility.
*/
public class VisorCacheV3 extends VisorCacheV2 {
/** */
private static final long serialVersionUID = 0L;
- /** Primary partitions IDs with offheap and swap entries count. */
+ /** @deprecated Needed only for backward compatibility. */
private Collection<GridTuple3<Integer, Long, Long>> primaryPartsOffheapSwap;
- /** Backup partitions IDs with offheap and swap entries count. */
+ /** @deprecated Needed only for backward compatibility. */
private Collection<GridTuple3<Integer, Long, Long>> backupPartsOffheapSwap;
- /** {@inheritDoc} */
- @Override public VisorCache from(IgniteEx ignite, String cacheName, int sample) throws IgniteCheckedException {
- VisorCache c = super.from(ignite, cacheName, sample);
-
- if (c != null && c instanceof VisorCacheV3) {
- VisorCacheV3 cacheV3 = (VisorCacheV3)c;
-
- GridCacheAdapter ca = ignite.context().cache().internalCache(cacheName);
-
- // Process only started caches.
- if (ca != null && ca.context().started()) {
- GridCacheSwapManager swap = ca.context().swap();
-
- cacheV3.primaryPartsOffheapSwap = new ArrayList<>(c.primaryPartitions().size());
-
- for (IgnitePair<Integer> part: c.primaryPartitions()) {
- int p = part.get1();
-
- cacheV3.primaryPartsOffheapSwap.add(new GridTuple3<>(p, swap.offheapEntriesCount(p), swap.swapEntriesCount(p)));
- }
-
- cacheV3.backupPartsOffheapSwap = new ArrayList<>(c.backupPartitions().size());
-
- for (IgnitePair<Integer> part: c.backupPartitions()) {
- int p = part.get1();
-
- cacheV3.backupPartsOffheapSwap.add(new GridTuple3<>(p, swap.offheapEntriesCount(p), swap.swapEntriesCount(p)));
- }
- }
- }
-
- return c;
- }
-
- /** {@inheritDoc} */
- @Override protected VisorCache initHistory(VisorCache c) {
- super.initHistory(c);
-
- if (c instanceof VisorCacheV3) {
- ((VisorCacheV3)c).primaryPartsOffheapSwap = Collections.emptyList();
- ((VisorCacheV3)c).backupPartsOffheapSwap = Collections.emptyList();
- }
-
- return c;
- }
-
- /** {@inheritDoc} */
- @Override public VisorCache history() {
- return initHistory(new VisorCacheV3());
- }
-
/**
- * @return Collection with primary partitions IDs and offheap and swap entries count.
+ * @deprecated Needed only for backward compatibility.
*/
public Collection<GridTuple3<Integer, Long, Long>> primaryPartitionsOffheapSwap() {
return primaryPartsOffheapSwap;
}
/**
- * @return Collection with backup partitions IDs and offheap and swap entries count.
+ * @deprecated Needed only for backward compatibility.
*/
public Collection<GridTuple3<Integer, Long, Long>> backupPartitionsOffheapSwap() {
return backupPartsOffheapSwap;