You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by nn...@apache.org on 2018/03/14 17:14:20 UTC
[geode] branch develop updated: GEODE-4689: Colocation complete
listeners added (#1565)
This is an automated email from the ASF dual-hosted git repository.
nnag pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new d621b47 GEODE-4689: Colocation complete listeners added (#1565)
d621b47 is described below
commit d621b47d6d3bd9c6b285b37ca61a25e420b5fc83
Author: Nabarun Nag <na...@users.noreply.github.com>
AuthorDate: Wed Mar 14 10:14:17 2018 -0700
GEODE-4689: Colocation complete listeners added (#1565)
* PartitionRegion can now add a colocation listener to itself which is triggered when the colocation is completed.
* Lucene indexRepositoryFactory adds colocation complete listener to the file region
* when the colocation of the file region is completed the compute repository is called
---
.../geode/internal/cache/ColocationListener.java | 27 +++++++++++++
.../internal/cache/PartitionRegionConfig.java | 5 +--
.../geode/internal/cache/PartitionedRegion.java | 23 +++++++++--
.../cache/PartitionRegionConfigJUnitTest.java | 35 ++++++++++++++++
.../cache/PartitionedRegionCreationJUnitTest.java | 1 +
.../lucene/internal/IndexRepositoryFactory.java | 25 ++++++++++--
.../LuceneFileRegionColocationListener.java | 47 ++++++++++++++++++++++
.../cache/lucene/internal/LuceneServiceImpl.java | 4 +-
.../internal/PartitionedRepositoryManager.java | 2 +-
.../lucene/internal/RawIndexRepositoryFactory.java | 4 +-
.../internal/RawLuceneRepositoryManager.java | 2 +-
.../cache/lucene/test/IndexRepositorySpy.java | 8 ++--
12 files changed, 163 insertions(+), 20 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/ColocationListener.java b/geode-core/src/main/java/org/apache/geode/internal/cache/ColocationListener.java
new file mode 100644
index 0000000..152c7f4
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/ColocationListener.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+/*
+ * Callbacks that get executed when the region colocation is completed
+ */
+public interface ColocationListener {
+ /*
+ * execute the call back after the region colocation has completed
+ */
+ default void afterColocationCompleted() {
+
+ }
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionRegionConfig.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionRegionConfig.java
index 799d8d7..4669870 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionRegionConfig.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionRegionConfig.java
@@ -24,7 +24,6 @@ import java.util.*;
import org.apache.geode.DataSerializer;
import org.apache.geode.cache.EvictionAttributes;
import org.apache.geode.cache.ExpirationAttributes;
-import org.apache.geode.cache.FixedPartitionAttributes;
import org.apache.geode.cache.PartitionAttributes;
import org.apache.geode.cache.Scope;
import org.apache.geode.cache.partition.PartitionListener;
@@ -32,7 +31,6 @@ import org.apache.geode.distributed.internal.membership.InternalDistributedMembe
import org.apache.geode.internal.ExternalizableDSFID;
import org.apache.geode.internal.InternalDataSerializer;
import org.apache.geode.internal.Version;
-import org.apache.geode.internal.cache.PartitionAttributesImpl;
import org.apache.geode.internal.util.Versionable;
import org.apache.geode.internal.util.VersionedArrayList;
@@ -343,8 +341,9 @@ public class PartitionRegionConfig extends ExternalizableDSFID implements Versio
isDestroying = true;
}
- void setColocationComplete() {
+ void setColocationComplete(PartitionedRegion partitionedRegion) {
this.isColocationComplete = true;
+ partitionedRegion.executeColocationCallbacks();
}
public boolean isGreaterNodeListVersion(final PartitionRegionConfig other) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
index 4287f0e..a1cba79 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
@@ -14,7 +14,7 @@
*/
package org.apache.geode.internal.cache;
-import static org.apache.geode.internal.lang.SystemUtils.*;
+import static org.apache.geode.internal.lang.SystemUtils.getLineSeparator;
import java.io.IOException;
import java.io.InputStream;
@@ -165,7 +165,6 @@ import org.apache.geode.internal.cache.control.InternalResourceManager.ResourceT
import org.apache.geode.internal.cache.control.MemoryEvent;
import org.apache.geode.internal.cache.control.MemoryThresholds;
import org.apache.geode.internal.cache.eviction.EvictionController;
-import org.apache.geode.internal.cache.eviction.EvictionCounters;
import org.apache.geode.internal.cache.eviction.HeapEvictor;
import org.apache.geode.internal.cache.execute.AbstractExecution;
import org.apache.geode.internal.cache.execute.FunctionExecutionNodePruner;
@@ -239,6 +238,7 @@ import org.apache.geode.internal.cache.wan.GatewaySenderConfigurationException;
import org.apache.geode.internal.cache.wan.GatewaySenderException;
import org.apache.geode.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue;
import org.apache.geode.internal.cache.wan.parallel.ParallelGatewaySenderQueue;
+import org.apache.geode.internal.concurrent.ConcurrentHashSet;
import org.apache.geode.internal.i18n.LocalizedStrings;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.internal.logging.LoggingThreadGroup;
@@ -463,6 +463,19 @@ public class PartitionedRegion extends LocalRegion
private final Object indexLock = new Object();
+ private final Set<ColocationListener> colocationListeners = new ConcurrentHashSet<>();
+
+ public void addColocationListener(ColocationListener colocationListener) {
+ if (colocationListener != null) {
+ colocationListeners.add(colocationListener);
+ }
+ }
+
+ public void removeColocationListener(ColocationListener colocationListener) {
+ colocationListeners.remove(colocationListener);
+ }
+
+
static PRIdMap getPrIdToPR() {
return prIdToPR;
}
@@ -1528,7 +1541,7 @@ public class PartitionedRegion extends LocalRegion
this.prRoot.get(colocatedRegion.getRegionIdentifier());
if (parentConf.isColocationComplete() && parentConf.hasSameDataStoreMembers(prConfig)) {
colocationComplete = true;
- prConfig.setColocationComplete();
+ prConfig.setColocationComplete(this);
}
}
@@ -1546,6 +1559,10 @@ public class PartitionedRegion extends LocalRegion
}
}
+ void executeColocationCallbacks() {
+ colocationListeners.stream().forEach(ColocationListener::afterColocationCompleted);
+ }
+
/**
* @param access true if caller wants last accessed time updated
* @param allowTombstones - whether a tombstone can be returned
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionRegionConfigJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionRegionConfigJUnitTest.java
new file mode 100644
index 0000000..401fbc5
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionRegionConfigJUnitTest.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.test.junit.categories.UnitTest;
+
+@Category(UnitTest.class)
+public class PartitionRegionConfigJUnitTest {
+ @Test
+ public void whenSetColocationCompleteThenAllColocationListenersMustBeExecuted() {
+ PartitionedRegion partitionedRegion = mock(PartitionedRegion.class);
+ PartitionRegionConfig partitionRegionConfig = new PartitionRegionConfig();
+ partitionRegionConfig.setColocationComplete(partitionedRegion);
+ verify(partitionedRegion, times(1)).executeColocationCallbacks();
+ }
+}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionCreationJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionCreationJUnitTest.java
index abdd811..39b0125 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionCreationJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionCreationJUnitTest.java
@@ -17,6 +17,7 @@ package org.apache.geode.internal.cache;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
import java.util.ArrayList;
import java.util.Iterator;
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactory.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactory.java
index f8e83be..618aa29 100644
--- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactory.java
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactory.java
@@ -50,8 +50,8 @@ public class IndexRepositoryFactory {
public IndexRepositoryFactory() {}
public IndexRepository computeIndexRepository(final Integer bucketId, LuceneSerializer serializer,
- InternalLuceneIndex index, PartitionedRegion userRegion, final IndexRepository oldRepository)
- throws IOException {
+ InternalLuceneIndex index, PartitionedRegion userRegion, final IndexRepository oldRepository,
+ PartitionedRepositoryManager partitionedRepositoryManager) throws IOException {
LuceneIndexForPartitionedRegion indexForPR = (LuceneIndexForPartitionedRegion) index;
final PartitionedRegion fileRegion = indexForPR.getFileAndChunkRegion();
@@ -59,10 +59,27 @@ public class IndexRepositoryFactory {
Region prRoot = PartitionedRegionHelper.getPRRoot(fileRegion.getCache());
PartitionRegionConfig prConfig =
(PartitionRegionConfig) prRoot.get(fileRegion.getRegionIdentifier());
- while (!prConfig.isColocationComplete()) {
- prConfig = (PartitionRegionConfig) prRoot.get(fileRegion.getRegionIdentifier());
+ LuceneFileRegionColocationListener luceneFileRegionColocationCompleteListener =
+ new LuceneFileRegionColocationListener(partitionedRepositoryManager, bucketId);
+ fileRegion.addColocationListener(luceneFileRegionColocationCompleteListener);
+ IndexRepository repo = null;
+ if (prConfig.isColocationComplete()) {
+ repo = finishComputingRepository(bucketId, serializer, userRegion, oldRepository, index);
}
+ return repo;
+ }
+ /*
+ * NOTE: The method finishComputingRepository must be called through computeIndexRepository.
+ * Executing finishComputingRepository outside of computeIndexRepository may result in race
+ * conditions.
+ * This is a util function just to not let computeIndexRepository be a huge chunk of code.
+ */
+ private IndexRepository finishComputingRepository(Integer bucketId, LuceneSerializer serializer,
+ PartitionedRegion userRegion, IndexRepository oldRepository, InternalLuceneIndex index)
+ throws IOException {
+ LuceneIndexForPartitionedRegion indexForPR = (LuceneIndexForPartitionedRegion) index;
+ final PartitionedRegion fileRegion = indexForPR.getFileAndChunkRegion();
BucketRegion fileAndChunkBucket = getMatchingBucket(fileRegion, bucketId);
BucketRegion dataBucket = getMatchingBucket(userRegion, bucketId);
boolean success = false;
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneFileRegionColocationListener.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneFileRegionColocationListener.java
new file mode 100644
index 0000000..7298283
--- /dev/null
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneFileRegionColocationListener.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.cache.lucene.internal;
+
+import org.apache.geode.internal.cache.ColocationListener;
+
+public class LuceneFileRegionColocationListener implements ColocationListener {
+ private final PartitionedRepositoryManager partitionedRepositoryManager;
+ private final Integer bucketID;
+
+ public LuceneFileRegionColocationListener(
+ PartitionedRepositoryManager partitionedRepositoryManager, Integer bucketID) {
+ this.partitionedRepositoryManager = partitionedRepositoryManager;
+ this.bucketID = bucketID;
+ }
+
+
+ @Override
+ public void afterColocationCompleted() {
+ this.partitionedRepositoryManager.computeRepository(this.bucketID);
+ }
+
+ // Current implementation will allow only one LuceneFileRegionColocationListener to be
+ // added to the PartitionRegion colocationListener set.
+ @Override
+ public int hashCode() {
+ return bucketID.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return (obj instanceof LuceneFileRegionColocationListener
+ && ((LuceneFileRegionColocationListener) obj).bucketID == this.bucketID);
+ }
+}
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneServiceImpl.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneServiceImpl.java
index b2c2412..5d0ea48 100644
--- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneServiceImpl.java
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneServiceImpl.java
@@ -242,11 +242,11 @@ public class LuceneServiceImpl implements InternalLuceneService {
protected boolean createLuceneIndexOnDataRegion(final PartitionedRegion userRegion,
final InternalLuceneIndex luceneIndex) {
try {
- PartitionedRepositoryManager repositoryManager =
- (PartitionedRepositoryManager) luceneIndex.getRepositoryManager();
if (userRegion.getDataStore() == null) {
return true;
}
+ PartitionedRepositoryManager repositoryManager =
+ (PartitionedRepositoryManager) luceneIndex.getRepositoryManager();
Set<Integer> primaryBucketIds = userRegion.getDataStore().getAllLocalPrimaryBucketIds();
Iterator primaryBucketIterator = primaryBucketIds.iterator();
while (primaryBucketIterator.hasNext()) {
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/PartitionedRepositoryManager.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/PartitionedRepositoryManager.java
index 9e90e95..f60f83b 100755
--- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/PartitionedRepositoryManager.java
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/PartitionedRepositoryManager.java
@@ -117,7 +117,7 @@ public class PartitionedRepositoryManager implements RepositoryManager {
InternalLuceneIndex index, PartitionedRegion userRegion, IndexRepository oldRepository)
throws IOException {
return indexRepositoryFactory.computeIndexRepository(bucketId, serializer, index, userRegion,
- oldRepository);
+ oldRepository, this);
}
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/RawIndexRepositoryFactory.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/RawIndexRepositoryFactory.java
index 8f8e09e..984d3eb 100755
--- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/RawIndexRepositoryFactory.java
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/RawIndexRepositoryFactory.java
@@ -34,8 +34,8 @@ public class RawIndexRepositoryFactory extends IndexRepositoryFactory {
@Override
public IndexRepository computeIndexRepository(final Integer bucketId, LuceneSerializer serializer,
- InternalLuceneIndex index, PartitionedRegion userRegion, IndexRepository oldRepository)
- throws IOException {
+ InternalLuceneIndex index, PartitionedRegion userRegion, IndexRepository oldRepository,
+ PartitionedRepositoryManager partitionedRepositoryManager) throws IOException {
final IndexRepository repo;
if (oldRepository != null) {
oldRepository.cleanup();
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/RawLuceneRepositoryManager.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/RawLuceneRepositoryManager.java
index f47e11e..0b38c45 100755
--- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/RawLuceneRepositoryManager.java
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/RawLuceneRepositoryManager.java
@@ -51,6 +51,6 @@ public class RawLuceneRepositoryManager extends PartitionedRepositoryManager {
InternalLuceneIndex index, PartitionedRegion userRegion, IndexRepository oldRepository)
throws IOException {
return indexRepositoryFactory.computeIndexRepository(bucketId, serializer, index, userRegion,
- oldRepository);
+ oldRepository, this);
}
}
diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/test/IndexRepositorySpy.java b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/test/IndexRepositorySpy.java
index d80ca63..37a1e6d 100644
--- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/test/IndexRepositorySpy.java
+++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/test/IndexRepositorySpy.java
@@ -50,10 +50,10 @@ public class IndexRepositorySpy extends IndexRepositoryFactory {
@Override
public IndexRepository computeIndexRepository(final Integer bucketId, LuceneSerializer serializer,
- InternalLuceneIndex index, PartitionedRegion userRegion, IndexRepository oldRepository)
- throws IOException {
- final IndexRepository indexRepo =
- super.computeIndexRepository(bucketId, serializer, index, userRegion, oldRepository);
+ InternalLuceneIndex index, PartitionedRegion userRegion, IndexRepository oldRepository,
+ PartitionedRepositoryManager partitionedRepositoryManager) throws IOException {
+ final IndexRepository indexRepo = super.computeIndexRepository(bucketId, serializer, index,
+ userRegion, oldRepository, partitionedRepositoryManager);
if (indexRepo == null) {
return null;
}
--
To stop receiving notification emails like this one, please contact
nnag@apache.org.