You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by up...@apache.org on 2015/12/01 22:27:36 UTC

incubator-geode git commit: GEODE-581: LoadProbe that balances based on bucket count

Repository: incubator-geode
Updated Branches:
  refs/heads/develop bff59d154 -> 442faa06f


GEODE-581: LoadProbe that balances based on bucket count

Adding a load probe that balances partitioned regions based on bucket
count, rather than on the amount of data in bytes on each node.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/442faa06
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/442faa06
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/442faa06

Branch: refs/heads/develop
Commit: 442faa06f3b3a7cb85065ee5b0585ed679c80ba0
Parents: bff59d1
Author: Dan Smith <ds...@pivotal.io>
Authored: Thu Dec 2 19:44:38 2010 +0000
Committer: Dan Smith <up...@apache.org>
Committed: Tue Dec 1 13:21:48 2015 -0800

----------------------------------------------------------------------
 .../gemstone/gemfire/internal/DSFIDFactory.java |   2 +
 .../internal/DataSerializableFixedID.java       |   3 +-
 .../cache/control/InternalResourceManager.java  |  11 +-
 .../cache/partitioned/BucketCountLoadProbe.java |  75 ++++++++++
 .../control/RebalanceOperationDUnitTest.java    | 141 ++++++++++++++++++-
 5 files changed, 227 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/442faa06/gemfire-core/src/main/java/com/gemstone/gemfire/internal/DSFIDFactory.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/DSFIDFactory.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/DSFIDFactory.java
index 24dd181..b77dfdb 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/DSFIDFactory.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/DSFIDFactory.java
@@ -302,6 +302,7 @@ import com.gemstone.gemfire.internal.cache.partitioned.AllBucketProfilesUpdateMe
 import com.gemstone.gemfire.internal.cache.partitioned.BecomePrimaryBucketMessage;
 import com.gemstone.gemfire.internal.cache.partitioned.BecomePrimaryBucketMessage.BecomePrimaryBucketReplyMessage;
 import com.gemstone.gemfire.internal.cache.partitioned.BucketBackupMessage;
+import com.gemstone.gemfire.internal.cache.partitioned.BucketCountLoadProbe;
 import com.gemstone.gemfire.internal.cache.partitioned.BucketProfileUpdateMessage;
 import com.gemstone.gemfire.internal.cache.partitioned.BucketSizeMessage;
 import com.gemstone.gemfire.internal.cache.partitioned.BucketSizeMessage.BucketSizeReplyMessage;
@@ -1052,6 +1053,7 @@ public final class DSFIDFactory implements DataSerializableFixedID {
         DestroyRegionOnDataStoreMessage.class);
     registerDSFID(SHUTDOWN_ALL_GATEWAYHUBS_REQUEST,
         ShutdownAllGatewayHubsRequest.class);
+    registerDSFID(BUCKET_COUNT_LOAD_PROBE, BucketCountLoadProbe.class);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/442faa06/gemfire-core/src/main/java/com/gemstone/gemfire/internal/DataSerializableFixedID.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/DataSerializableFixedID.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/DataSerializableFixedID.java
index 8dd8f00..7b263bf 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/DataSerializableFixedID.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/DataSerializableFixedID.java
@@ -746,8 +746,9 @@ public interface DataSerializableFixedID extends SerializationVersions {
   public static final short SERIAL_ACKED_MESSAGE = 2001;
   public static final short CLIENT_DATASERIALIZER_MESSAGE=2002;
   
-  //2003..2099 unused
+  //2003..2098 unused
   
+  public static final short BUCKET_COUNT_LOAD_PROBE = 2099;
   public static final short PERSISTENT_MEMBERSHIP_VIEW_REQUEST =2100;
   public static final short PERSISTENT_MEMBERSHIP_VIEW_REPLY = 2101;
   public static final short PERSISTENT_STATE_QUERY_REQUEST = 2102;

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/442faa06/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/control/InternalResourceManager.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/control/InternalResourceManager.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/control/InternalResourceManager.java
index 98e1f25..ef9c502 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/control/InternalResourceManager.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/control/InternalResourceManager.java
@@ -89,7 +89,7 @@ public class InternalResourceManager implements ResourceManager {
 
   final GemFireCacheImpl cache;
   
-  private final LoadProbe loadProbe;
+  private LoadProbe loadProbe;
   
   private final ResourceManagerStats stats;
   private final ResourceAdvisor resourceAdvisor;
@@ -545,6 +545,15 @@ public class InternalResourceManager implements ResourceManager {
     return this.loadProbe;
   }
 
+  /**
+   * This method is test purposes only.
+   */
+  public LoadProbe setLoadProbe(LoadProbe probe) {
+    LoadProbe old = this.loadProbe;
+    this.loadProbe = probe;
+    return old;
+  }
+
   /* (non-Javadoc)
    * @see com.gemstone.gemfire.cache.control.ResourceManager#setEvictionHeapPercentage(int)
    */

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/442faa06/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/BucketCountLoadProbe.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/BucketCountLoadProbe.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/BucketCountLoadProbe.java
new file mode 100644
index 0000000..07c9a1f
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/BucketCountLoadProbe.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache.partitioned;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import com.gemstone.gemfire.internal.DataSerializableFixedID;
+import com.gemstone.gemfire.internal.Version;
+import com.gemstone.gemfire.internal.cache.BucketAdvisor;
+import com.gemstone.gemfire.internal.cache.PartitionedRegion;
+import com.gemstone.gemfire.internal.cache.PartitionedRegionDataStore;
+
+/**
+ * A load probe which calculates the load of a pr using
+ * the just the number of buckets on a member.
+ * 
+ */
+public class BucketCountLoadProbe implements LoadProbe, DataSerializableFixedID {
+  private static final long serialVersionUID = 7040814060882774875L;
+
+  public PRLoad getLoad(PartitionedRegion pr) {
+    PartitionedRegionDataStore ds = pr.getDataStore();
+    int configuredBucketCount = pr.getTotalNumberOfBuckets();
+    PRLoad prLoad = new PRLoad(
+        configuredBucketCount, pr.getLocalMaxMemory());
+    
+    // key: bid, value: size
+    for(Integer bidInt : ds.getAllLocalBucketIds()) {
+      int bid = bidInt.intValue();
+      
+      BucketAdvisor bucketAdvisor = pr.getRegionAdvisor().
+      getBucket(bid).getBucketAdvisor();
+      //Wait for a primary to exist for this bucket, because
+      //it might be this member.
+      bucketAdvisor.getPrimary();
+      boolean isPrimary = pr.getRegionAdvisor().
+          getBucket(bid).getBucketAdvisor().isPrimary();
+      prLoad.addBucket(bid, 1, isPrimary ? 1 : 0);
+    }
+    
+    return prLoad;
+  }
+
+  public void fromData(DataInput in) throws IOException, ClassNotFoundException {
+  }
+
+  public void toData(DataOutput out) throws IOException {
+  }
+
+  public int getDSFID() {
+    return BUCKET_COUNT_LOAD_PROBE;
+  }
+
+  @Override
+  public Version[] getSerializationVersions() {
+    return null;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/442faa06/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/control/RebalanceOperationDUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/control/RebalanceOperationDUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/control/RebalanceOperationDUnitTest.java
index 9029b8d..88516fe 100644
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/control/RebalanceOperationDUnitTest.java
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/control/RebalanceOperationDUnitTest.java
@@ -43,6 +43,7 @@ import com.gemstone.gemfire.cache.DiskStoreFactory;
 import com.gemstone.gemfire.cache.EntryEvent;
 import com.gemstone.gemfire.cache.EvictionAction;
 import com.gemstone.gemfire.cache.EvictionAttributes;
+import com.gemstone.gemfire.cache.GemFireCache;
 import com.gemstone.gemfire.cache.LoaderHelper;
 import com.gemstone.gemfire.cache.PartitionAttributes;
 import com.gemstone.gemfire.cache.PartitionAttributesFactory;
@@ -66,9 +67,12 @@ import com.gemstone.gemfire.internal.cache.BucketRegion;
 import com.gemstone.gemfire.internal.cache.ColocationHelper;
 import com.gemstone.gemfire.internal.cache.DiskStoreImpl;
 import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.cache.InternalCache;
 import com.gemstone.gemfire.internal.cache.PartitionedRegion;
 import com.gemstone.gemfire.internal.cache.PartitionedRegionDataStore;
 import com.gemstone.gemfire.internal.cache.control.InternalResourceManager.ResourceObserverAdapter;
+import com.gemstone.gemfire.internal.cache.partitioned.BucketCountLoadProbe;
+import com.gemstone.gemfire.internal.cache.partitioned.LoadProbe;
 
 import dunit.AsyncInvocation;
 import dunit.Host;
@@ -2810,13 +2814,144 @@ public class RebalanceOperationDUnitTest extends CacheTestCase {
     moveBucketsWithUnrecoveredValuesRedundancy(false);
   }
   
+  public void testBalanceBucketsByCountSimulation() {
+    balanceBucketsByCount(true);
+  }
+  
+  public void testBalanceBucketsByCount() {
+    balanceBucketsByCount(false);
+  }
+  
   /**
    * Check to make sure that we balance
-   * buckets between two hosts with no redundancy,
-   * 
-   * even if the values have not yet been faulted in from disk.
+   * buckets between two hosts with no redundancy.
    * @param simulate
    */
+  public void balanceBucketsByCount(final boolean simulate) {
+    Host host = Host.getHost(0);
+    VM vm0 = host.getVM(0);
+    VM vm1 = host.getVM(1);
+
+    LoadProbe oldProbe = setLoadProbe(vm0, new BucketCountLoadProbe());
+    try {
+      SerializableRunnable createPrRegion = new SerializableRunnable("createRegion") {
+        public void run()
+        {
+          Cache cache = getCache();
+          AttributesFactory attr = new AttributesFactory();
+          PartitionAttributesFactory paf = new PartitionAttributesFactory();
+          paf.setRedundantCopies(0);
+          paf.setRecoveryDelay(-1);
+          paf.setStartupRecoveryDelay(-1);
+          PartitionAttributes prAttr = paf.create();
+          attr.setPartitionAttributes(prAttr);
+          attr.setCacheLoader(new Bug40228Loader());
+          cache.createRegion("region1", attr.create());
+        }
+      };
+
+      //Create the region in only 1 VM
+      vm0.invoke(createPrRegion);
+
+      //Create some buckets with very uneven sizes
+      vm0.invoke(new SerializableRunnable("createSomeBuckets") {
+
+        public void run() {
+          Cache cache = getCache();
+          Region region = cache.getRegion("region1");
+          region.put(Integer.valueOf(1), new byte[1024 * 1024]);
+          region.put(Integer.valueOf(2), "A");
+          region.put(Integer.valueOf(3), "A");
+          region.put(Integer.valueOf(4), "A");
+          region.put(Integer.valueOf(5), "A");
+          region.put(Integer.valueOf(6), "A");
+        }
+      });
+
+      //Create the region in the other VM (should have no effect)
+      vm1.invoke(createPrRegion);
+
+      //Now simulate a rebalance
+      vm0.invoke(new SerializableRunnable("simulateRebalance") {
+
+        public void run() {
+          Cache cache = getCache();
+          ResourceManager manager = cache.getResourceManager();
+          RebalanceResults results = doRebalance(simulate, manager);
+          assertEquals(0, results.getTotalBucketCreatesCompleted());
+          assertEquals(0, results.getTotalPrimaryTransfersCompleted());
+          assertEquals(3, results.getTotalBucketTransfersCompleted());
+          assertTrue(0 < results.getTotalBucketTransferBytes());
+          Set<PartitionRebalanceInfo> detailSet = results.getPartitionRebalanceDetails();
+          assertEquals(1, detailSet.size());
+          PartitionRebalanceInfo details = detailSet.iterator().next();
+          assertEquals(0, details.getBucketCreatesCompleted());
+          assertEquals(0, details.getPrimaryTransfersCompleted());
+          assertTrue(0 < details.getBucketTransferBytes());
+          assertEquals(3, details.getBucketTransfersCompleted());
+
+          Set<PartitionMemberInfo> afterDetails = details.getPartitionMemberDetailsAfter();
+          assertEquals(2, afterDetails.size());
+          for(PartitionMemberInfo memberDetails: afterDetails) {
+            assertEquals(3, memberDetails.getBucketCount());
+            assertEquals(3, memberDetails.getPrimaryCount());
+          }
+          if(!simulate) {
+            verifyStats(manager, results);
+          }
+        }
+      });
+
+      if(!simulate) {
+        SerializableRunnable checkRedundancyFixed = new SerializableRunnable("checkRedundancyFixed") {
+
+          public void run() {
+            Cache cache = getCache();
+            Region region = cache.getRegion("region1");
+            PartitionRegionInfo details = PartitionRegionHelper.getPartitionRegionInfo(region);
+            assertEquals(6, details.getCreatedBucketCount());
+            assertEquals(0,details.getActualRedundantCopies());
+            assertEquals(0,details.getLowRedundancyBucketCount());
+            assertEquals(2, details.getPartitionMemberInfo().size());
+            for(PartitionMemberInfo memberDetails: details.getPartitionMemberInfo()) {
+              assertEquals(3, memberDetails.getBucketCount());
+              assertEquals(3, memberDetails.getPrimaryCount());
+            }
+
+            //check to make sure that moving buckets didn't close the cache loader
+            Bug40228Loader loader = (Bug40228Loader) cache.getRegion("region1").getAttributes().getCacheLoader();
+            assertFalse(loader.isClosed());
+          }
+        };
+
+        vm0.invoke(checkRedundancyFixed);
+        vm1.invoke(checkRedundancyFixed);
+      }
+    } finally {
+      setLoadProbe(vm0, oldProbe);
+    }
+  }
+  
+  private LoadProbe setLoadProbe(VM vm, final LoadProbe probe) {
+    LoadProbe oldProbe = (LoadProbe) vm.invoke(new SerializableCallable("set load probe") {
+      
+      public Object call() {
+        GemFireCacheImpl cache = (GemFireCacheImpl) getCache();
+        InternalResourceManager mgr = cache.getResourceManager();
+        return mgr.setLoadProbe(probe);
+      }
+    });
+    
+    return oldProbe;
+  }
+
+  /** 
+   * Test to ensure that we wait for
+   * in progress write operations before moving a primary.
+   * @throws InterruptedException 
+   * @throws CancellationException 
+   * @throws TimeoutException 
+   */
   public void moveBucketsWithUnrecoveredValuesRedundancy(final boolean simulate) {
     Host host = Host.getHost(0);
     VM vm0 = host.getVM(0);