You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2015/12/01 23:22:12 UTC
[48/50] [abbrv] incubator-geode git commit: GEODE-581: LoadProbe that
balances based on bucket count
GEODE-581: LoadProbe that balances based on bucket count
Adding a load probe that balances partitioned regions based on bucket
count, rather than on the amount of data in bytes on each node.
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/442faa06
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/442faa06
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/442faa06
Branch: refs/heads/feature/GEODE-291
Commit: 442faa06f3b3a7cb85065ee5b0585ed679c80ba0
Parents: bff59d1
Author: Dan Smith <ds...@pivotal.io>
Authored: Thu Dec 2 19:44:38 2010 +0000
Committer: Dan Smith <up...@apache.org>
Committed: Tue Dec 1 13:21:48 2015 -0800
----------------------------------------------------------------------
.../gemstone/gemfire/internal/DSFIDFactory.java | 2 +
.../internal/DataSerializableFixedID.java | 3 +-
.../cache/control/InternalResourceManager.java | 11 +-
.../cache/partitioned/BucketCountLoadProbe.java | 75 ++++++++++
.../control/RebalanceOperationDUnitTest.java | 141 ++++++++++++++++++-
5 files changed, 227 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/442faa06/gemfire-core/src/main/java/com/gemstone/gemfire/internal/DSFIDFactory.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/DSFIDFactory.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/DSFIDFactory.java
index 24dd181..b77dfdb 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/DSFIDFactory.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/DSFIDFactory.java
@@ -302,6 +302,7 @@ import com.gemstone.gemfire.internal.cache.partitioned.AllBucketProfilesUpdateMe
import com.gemstone.gemfire.internal.cache.partitioned.BecomePrimaryBucketMessage;
import com.gemstone.gemfire.internal.cache.partitioned.BecomePrimaryBucketMessage.BecomePrimaryBucketReplyMessage;
import com.gemstone.gemfire.internal.cache.partitioned.BucketBackupMessage;
+import com.gemstone.gemfire.internal.cache.partitioned.BucketCountLoadProbe;
import com.gemstone.gemfire.internal.cache.partitioned.BucketProfileUpdateMessage;
import com.gemstone.gemfire.internal.cache.partitioned.BucketSizeMessage;
import com.gemstone.gemfire.internal.cache.partitioned.BucketSizeMessage.BucketSizeReplyMessage;
@@ -1052,6 +1053,7 @@ public final class DSFIDFactory implements DataSerializableFixedID {
DestroyRegionOnDataStoreMessage.class);
registerDSFID(SHUTDOWN_ALL_GATEWAYHUBS_REQUEST,
ShutdownAllGatewayHubsRequest.class);
+ registerDSFID(BUCKET_COUNT_LOAD_PROBE, BucketCountLoadProbe.class);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/442faa06/gemfire-core/src/main/java/com/gemstone/gemfire/internal/DataSerializableFixedID.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/DataSerializableFixedID.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/DataSerializableFixedID.java
index 8dd8f00..7b263bf 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/DataSerializableFixedID.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/DataSerializableFixedID.java
@@ -746,8 +746,9 @@ public interface DataSerializableFixedID extends SerializationVersions {
public static final short SERIAL_ACKED_MESSAGE = 2001;
public static final short CLIENT_DATASERIALIZER_MESSAGE=2002;
- //2003..2099 unused
+ //2003..2098 unused
+ public static final short BUCKET_COUNT_LOAD_PROBE = 2099;
public static final short PERSISTENT_MEMBERSHIP_VIEW_REQUEST =2100;
public static final short PERSISTENT_MEMBERSHIP_VIEW_REPLY = 2101;
public static final short PERSISTENT_STATE_QUERY_REQUEST = 2102;
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/442faa06/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/control/InternalResourceManager.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/control/InternalResourceManager.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/control/InternalResourceManager.java
index 98e1f25..ef9c502 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/control/InternalResourceManager.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/control/InternalResourceManager.java
@@ -89,7 +89,7 @@ public class InternalResourceManager implements ResourceManager {
final GemFireCacheImpl cache;
- private final LoadProbe loadProbe;
+ private LoadProbe loadProbe;
private final ResourceManagerStats stats;
private final ResourceAdvisor resourceAdvisor;
@@ -545,6 +545,15 @@ public class InternalResourceManager implements ResourceManager {
return this.loadProbe;
}
+ /**
+ * This method is test purposes only.
+ */
+ public LoadProbe setLoadProbe(LoadProbe probe) {
+ LoadProbe old = this.loadProbe;
+ this.loadProbe = probe;
+ return old;
+ }
+
/* (non-Javadoc)
* @see com.gemstone.gemfire.cache.control.ResourceManager#setEvictionHeapPercentage(int)
*/
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/442faa06/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/BucketCountLoadProbe.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/BucketCountLoadProbe.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/BucketCountLoadProbe.java
new file mode 100644
index 0000000..07c9a1f
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/BucketCountLoadProbe.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache.partitioned;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import com.gemstone.gemfire.internal.DataSerializableFixedID;
+import com.gemstone.gemfire.internal.Version;
+import com.gemstone.gemfire.internal.cache.BucketAdvisor;
+import com.gemstone.gemfire.internal.cache.PartitionedRegion;
+import com.gemstone.gemfire.internal.cache.PartitionedRegionDataStore;
+
+/**
+ * A load probe which calculates the load of a pr using
+ * the just the number of buckets on a member.
+ *
+ */
+public class BucketCountLoadProbe implements LoadProbe, DataSerializableFixedID {
+ private static final long serialVersionUID = 7040814060882774875L;
+
+ public PRLoad getLoad(PartitionedRegion pr) {
+ PartitionedRegionDataStore ds = pr.getDataStore();
+ int configuredBucketCount = pr.getTotalNumberOfBuckets();
+ PRLoad prLoad = new PRLoad(
+ configuredBucketCount, pr.getLocalMaxMemory());
+
+ // key: bid, value: size
+ for(Integer bidInt : ds.getAllLocalBucketIds()) {
+ int bid = bidInt.intValue();
+
+ BucketAdvisor bucketAdvisor = pr.getRegionAdvisor().
+ getBucket(bid).getBucketAdvisor();
+ //Wait for a primary to exist for this bucket, because
+ //it might be this member.
+ bucketAdvisor.getPrimary();
+ boolean isPrimary = pr.getRegionAdvisor().
+ getBucket(bid).getBucketAdvisor().isPrimary();
+ prLoad.addBucket(bid, 1, isPrimary ? 1 : 0);
+ }
+
+ return prLoad;
+ }
+
+ public void fromData(DataInput in) throws IOException, ClassNotFoundException {
+ }
+
+ public void toData(DataOutput out) throws IOException {
+ }
+
+ public int getDSFID() {
+ return BUCKET_COUNT_LOAD_PROBE;
+ }
+
+ @Override
+ public Version[] getSerializationVersions() {
+ return null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/442faa06/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/control/RebalanceOperationDUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/control/RebalanceOperationDUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/control/RebalanceOperationDUnitTest.java
index 9029b8d..88516fe 100644
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/control/RebalanceOperationDUnitTest.java
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/control/RebalanceOperationDUnitTest.java
@@ -43,6 +43,7 @@ import com.gemstone.gemfire.cache.DiskStoreFactory;
import com.gemstone.gemfire.cache.EntryEvent;
import com.gemstone.gemfire.cache.EvictionAction;
import com.gemstone.gemfire.cache.EvictionAttributes;
+import com.gemstone.gemfire.cache.GemFireCache;
import com.gemstone.gemfire.cache.LoaderHelper;
import com.gemstone.gemfire.cache.PartitionAttributes;
import com.gemstone.gemfire.cache.PartitionAttributesFactory;
@@ -66,9 +67,12 @@ import com.gemstone.gemfire.internal.cache.BucketRegion;
import com.gemstone.gemfire.internal.cache.ColocationHelper;
import com.gemstone.gemfire.internal.cache.DiskStoreImpl;
import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.cache.InternalCache;
import com.gemstone.gemfire.internal.cache.PartitionedRegion;
import com.gemstone.gemfire.internal.cache.PartitionedRegionDataStore;
import com.gemstone.gemfire.internal.cache.control.InternalResourceManager.ResourceObserverAdapter;
+import com.gemstone.gemfire.internal.cache.partitioned.BucketCountLoadProbe;
+import com.gemstone.gemfire.internal.cache.partitioned.LoadProbe;
import dunit.AsyncInvocation;
import dunit.Host;
@@ -2810,13 +2814,144 @@ public class RebalanceOperationDUnitTest extends CacheTestCase {
moveBucketsWithUnrecoveredValuesRedundancy(false);
}
+ public void testBalanceBucketsByCountSimulation() {
+ balanceBucketsByCount(true);
+ }
+
+ public void testBalanceBucketsByCount() {
+ balanceBucketsByCount(false);
+ }
+
/**
* Check to make sure that we balance
- * buckets between two hosts with no redundancy,
- *
- * even if the values have not yet been faulted in from disk.
+ * buckets between two hosts with no redundancy.
* @param simulate
*/
+ public void balanceBucketsByCount(final boolean simulate) {
+ Host host = Host.getHost(0);
+ VM vm0 = host.getVM(0);
+ VM vm1 = host.getVM(1);
+
+ LoadProbe oldProbe = setLoadProbe(vm0, new BucketCountLoadProbe());
+ try {
+ SerializableRunnable createPrRegion = new SerializableRunnable("createRegion") {
+ public void run()
+ {
+ Cache cache = getCache();
+ AttributesFactory attr = new AttributesFactory();
+ PartitionAttributesFactory paf = new PartitionAttributesFactory();
+ paf.setRedundantCopies(0);
+ paf.setRecoveryDelay(-1);
+ paf.setStartupRecoveryDelay(-1);
+ PartitionAttributes prAttr = paf.create();
+ attr.setPartitionAttributes(prAttr);
+ attr.setCacheLoader(new Bug40228Loader());
+ cache.createRegion("region1", attr.create());
+ }
+ };
+
+ //Create the region in only 1 VM
+ vm0.invoke(createPrRegion);
+
+ //Create some buckets with very uneven sizes
+ vm0.invoke(new SerializableRunnable("createSomeBuckets") {
+
+ public void run() {
+ Cache cache = getCache();
+ Region region = cache.getRegion("region1");
+ region.put(Integer.valueOf(1), new byte[1024 * 1024]);
+ region.put(Integer.valueOf(2), "A");
+ region.put(Integer.valueOf(3), "A");
+ region.put(Integer.valueOf(4), "A");
+ region.put(Integer.valueOf(5), "A");
+ region.put(Integer.valueOf(6), "A");
+ }
+ });
+
+ //Create the region in the other VM (should have no effect)
+ vm1.invoke(createPrRegion);
+
+ //Now simulate a rebalance
+ vm0.invoke(new SerializableRunnable("simulateRebalance") {
+
+ public void run() {
+ Cache cache = getCache();
+ ResourceManager manager = cache.getResourceManager();
+ RebalanceResults results = doRebalance(simulate, manager);
+ assertEquals(0, results.getTotalBucketCreatesCompleted());
+ assertEquals(0, results.getTotalPrimaryTransfersCompleted());
+ assertEquals(3, results.getTotalBucketTransfersCompleted());
+ assertTrue(0 < results.getTotalBucketTransferBytes());
+ Set<PartitionRebalanceInfo> detailSet = results.getPartitionRebalanceDetails();
+ assertEquals(1, detailSet.size());
+ PartitionRebalanceInfo details = detailSet.iterator().next();
+ assertEquals(0, details.getBucketCreatesCompleted());
+ assertEquals(0, details.getPrimaryTransfersCompleted());
+ assertTrue(0 < details.getBucketTransferBytes());
+ assertEquals(3, details.getBucketTransfersCompleted());
+
+ Set<PartitionMemberInfo> afterDetails = details.getPartitionMemberDetailsAfter();
+ assertEquals(2, afterDetails.size());
+ for(PartitionMemberInfo memberDetails: afterDetails) {
+ assertEquals(3, memberDetails.getBucketCount());
+ assertEquals(3, memberDetails.getPrimaryCount());
+ }
+ if(!simulate) {
+ verifyStats(manager, results);
+ }
+ }
+ });
+
+ if(!simulate) {
+ SerializableRunnable checkRedundancyFixed = new SerializableRunnable("checkRedundancyFixed") {
+
+ public void run() {
+ Cache cache = getCache();
+ Region region = cache.getRegion("region1");
+ PartitionRegionInfo details = PartitionRegionHelper.getPartitionRegionInfo(region);
+ assertEquals(6, details.getCreatedBucketCount());
+ assertEquals(0,details.getActualRedundantCopies());
+ assertEquals(0,details.getLowRedundancyBucketCount());
+ assertEquals(2, details.getPartitionMemberInfo().size());
+ for(PartitionMemberInfo memberDetails: details.getPartitionMemberInfo()) {
+ assertEquals(3, memberDetails.getBucketCount());
+ assertEquals(3, memberDetails.getPrimaryCount());
+ }
+
+ //check to make sure that moving buckets didn't close the cache loader
+ Bug40228Loader loader = (Bug40228Loader) cache.getRegion("region1").getAttributes().getCacheLoader();
+ assertFalse(loader.isClosed());
+ }
+ };
+
+ vm0.invoke(checkRedundancyFixed);
+ vm1.invoke(checkRedundancyFixed);
+ }
+ } finally {
+ setLoadProbe(vm0, oldProbe);
+ }
+ }
+
+ private LoadProbe setLoadProbe(VM vm, final LoadProbe probe) {
+ LoadProbe oldProbe = (LoadProbe) vm.invoke(new SerializableCallable("set load probe") {
+
+ public Object call() {
+ GemFireCacheImpl cache = (GemFireCacheImpl) getCache();
+ InternalResourceManager mgr = cache.getResourceManager();
+ return mgr.setLoadProbe(probe);
+ }
+ });
+
+ return oldProbe;
+ }
+
+ /**
+ * Test to ensure that we wait for
+ * in progress write operations before moving a primary.
+ * @throws InterruptedException
+ * @throws CancellationException
+ * @throws TimeoutException
+ */
public void moveBucketsWithUnrecoveredValuesRedundancy(final boolean simulate) {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);