You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ud...@apache.org on 2016/02/22 22:43:00 UTC
[012/100] [abbrv] [partial] incubator-geode git commit: Merge
remote-tracking branch 'origin/develop' into feature/GEODE-917
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/test/java/com/gemstone/gemfire/cache/query/internal/index/ConcurrentIndexInitOnOverflowRegionDUnitTest.java
----------------------------------------------------------------------
diff --cc geode-core/src/test/java/com/gemstone/gemfire/cache/query/internal/index/ConcurrentIndexInitOnOverflowRegionDUnitTest.java
index 5528299,0000000..ca90f00
mode 100644,000000..100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/cache/query/internal/index/ConcurrentIndexInitOnOverflowRegionDUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache/query/internal/index/ConcurrentIndexInitOnOverflowRegionDUnitTest.java
@@@ -1,468 -1,0 +1,467 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ *
+ */
+package com.gemstone.gemfire.cache.query.internal.index;
+
+import java.io.IOException;
+
+import com.gemstone.gemfire.cache.AttributesFactory;
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.CacheException;
+import com.gemstone.gemfire.cache.DataPolicy;
+import com.gemstone.gemfire.cache.DiskStore;
+import com.gemstone.gemfire.cache.EvictionAction;
+import com.gemstone.gemfire.cache.EvictionAlgorithm;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.RegionFactory;
+import com.gemstone.gemfire.cache.RegionShortcut;
+import com.gemstone.gemfire.cache.client.ClientCache;
+import com.gemstone.gemfire.cache.client.ClientCacheFactory;
+import com.gemstone.gemfire.cache.client.ClientRegionShortcut;
+import com.gemstone.gemfire.cache.query.Index;
+import com.gemstone.gemfire.cache.query.QueryService;
+import com.gemstone.gemfire.cache.query.data.Portfolio;
+import com.gemstone.gemfire.cache.query.data.PortfolioData;
+import com.gemstone.gemfire.cache.query.internal.index.IndexManager.TestHook;
+import com.gemstone.gemfire.cache.query.partitioned.PRQueryDUnitHelper;
+import com.gemstone.gemfire.cache.server.CacheServer;
+import com.gemstone.gemfire.cache30.CacheSerializableRunnable;
+import com.gemstone.gemfire.cache30.CacheTestCase;
+import com.gemstone.gemfire.internal.cache.EvictionAttributesImpl;
+import com.gemstone.gemfire.test.dunit.AsyncInvocation;
+import com.gemstone.gemfire.test.dunit.Host;
+import com.gemstone.gemfire.test.dunit.IgnoredException;
+import com.gemstone.gemfire.test.dunit.LogWriterUtils;
+import com.gemstone.gemfire.test.dunit.NetworkUtils;
+import com.gemstone.gemfire.test.dunit.ThreadUtils;
+import com.gemstone.gemfire.test.dunit.VM;
+import com.gemstone.gemfire.test.dunit.Wait;
+
+/**
+ * @author shobhit
+ *
+ */
+public class ConcurrentIndexInitOnOverflowRegionDUnitTest extends CacheTestCase {
+
+ PRQueryDUnitHelper PRQHelp = new PRQueryDUnitHelper("");
+
+ String name;
+
+ final int redundancy = 0;
+
+ private int cnt = 0;
+
+ private int cntDest = 1;
+
+ public static volatile boolean hooked = false;
+
+ private static int bridgeServerPort;
+
+ /**
+ * @param name
+ */
+ public ConcurrentIndexInitOnOverflowRegionDUnitTest(String name) {
+ super(name);
+ }
+
+ /**
+ *
+ */
+ public void testAsyncIndexInitDuringEntryDestroyAndQueryOnRR() {
+ Host host = Host.getHost(0);
+ VM vm0 = host.getVM(0);
+
+ name = "PartionedPortfoliosPR";
+ // Create Overflow Persistent Partition Region
+ vm0.invoke(new CacheSerializableRunnable(
+ "Create local region with synchronous index maintenance") {
+ @Override
+ public void run2() throws CacheException {
+ Cache cache = PRQHelp.getCache();
+ Region partitionRegion = null;
+ IndexManager.testHook = null;
+ try {
+ DiskStore ds = cache.findDiskStore("disk");
+ if (ds == null) {
+ ds = cache.createDiskStoreFactory().setDiskDirs(getDiskDirs())
+ .create("disk");
+ }
+ AttributesFactory attr = new AttributesFactory();
+ attr.setValueConstraint(PortfolioData.class);
+ attr.setIndexMaintenanceSynchronous(true);
+ EvictionAttributesImpl evicAttr = new EvictionAttributesImpl()
+ .setAction(EvictionAction.OVERFLOW_TO_DISK);
+ evicAttr.setAlgorithm(EvictionAlgorithm.LRU_ENTRY).setMaximum(1);
+ attr.setEvictionAttributes(evicAttr);
+ attr.setDataPolicy(DataPolicy.REPLICATE);
+ // attr.setPartitionAttributes(new
+ // PartitionAttributesFactory().setTotalNumBuckets(1).create());
+ attr.setDiskStoreName("disk");
+ RegionFactory regionFactory = cache.createRegionFactory(attr.create());
+ partitionRegion = regionFactory.create(name);
+ } catch (IllegalStateException ex) {
+ LogWriterUtils.getLogWriter().warning("Creation caught IllegalStateException", ex);
+ }
+ assertNotNull("Region " + name + " not in cache", cache.getRegion(name));
+ assertNotNull("Region ref null", partitionRegion);
+ assertTrue("Region ref claims to be destroyed",
+ !partitionRegion.isDestroyed());
+ // Create Indexes
+ try {
+ Index index = cache.getQueryService().createIndex("statusIndex",
+ "p.status", "/" + name + " p");
+ assertNotNull(index);
+ } catch (Exception e1) {
+ e1.printStackTrace();
+ fail("Index creation failed");
+ }
+ }
+ });
+
+ // Start changing the value in Region which should turn into a deadlock if
+ // the fix is not there
+ AsyncInvocation asyncInv1 = vm0.invokeAsync(new CacheSerializableRunnable(
+ "Change value in region") {
+
+ @Override
+ public void run2() throws CacheException {
+ Cache cache = PRQHelp.getCache();
+
+ // Do a put in region.
+ Region r = PRQHelp.getCache().getRegion(name);
+
+ for (int i = 0; i < 100; i++) {
+ r.put(i, new PortfolioData(i));
+ }
+
+ assertNull(IndexManager.testHook);
+ IndexManager.testHook = new IndexManagerTestHook();
+
+ // Destroy one of the values.
+ PRQHelp.getCache().getLogger().fine("Destroying the value");
+ r.destroy(1);
+
+ IndexManager.testHook = null;
+ }
+ });
+
+ AsyncInvocation asyncInv2 = vm0.invokeAsync(new CacheSerializableRunnable(
+ "Run query on region") {
+
+ @Override
+ public void run2() throws CacheException {
+ Cache cache = PRQHelp.getCache();
+
+ while (!hooked) {
+ Wait.pause(100);
+ }
+ // Create and hence initialize Index
+ try {
+ Index index = cache.getQueryService().createIndex("idIndex",
+ "p.ID", "/" + name + " p");
+ assertNotNull(index);
+ } catch (Exception e1) {
+ e1.printStackTrace();
+ fail("Index creation failed");
+ }
+ }
+ });
+
+ // If we take more than 30 seconds then its a deadlock.
+ ThreadUtils.join(asyncInv2, 30 * 1000);
+ ThreadUtils.join(asyncInv1, 30 * 1000);
+ }
+
+ /**
+ *
+ */
+ public void testAsyncIndexInitDuringEntryPutUsingClientOnRR() {
+ Host host = Host.getHost(0);
+ VM vm0 = host.getVM(0);
+ VM vm1 = host.getVM(1);
+
+ IgnoredException.addIgnoredException("Unexpected IOException:");
+ IgnoredException.addIgnoredException("java.net.SocketException");
+
+ name = "PartionedPortfoliosPR";
+ // Create Overflow Persistent Partition Region
+ vm0.invoke(new CacheSerializableRunnable(
+ "Create local region with synchronous index maintenance") {
+ @Override
+ public void run2() throws CacheException {
+ Cache cache = PRQHelp.getCache();
+
+ Region partitionRegion = null;
+ IndexManager.testHook = null;
+ try {
+ CacheServer bridge = cache.addCacheServer();
+ bridge.setPort(0);
+ bridge.start();
+ bridgeServerPort = bridge.getPort();
+
+ DiskStore ds = cache.findDiskStore("disk");
+ if (ds == null) {
+ ds = cache.createDiskStoreFactory().setDiskDirs(getDiskDirs())
+ .create("disk");
+ }
+ AttributesFactory attr = new AttributesFactory();
+ attr.setValueConstraint(PortfolioData.class);
+ attr.setIndexMaintenanceSynchronous(true);
+ EvictionAttributesImpl evicAttr = new EvictionAttributesImpl()
+ .setAction(EvictionAction.OVERFLOW_TO_DISK);
+ evicAttr.setAlgorithm(EvictionAlgorithm.LRU_ENTRY).setMaximum(1);
+ attr.setEvictionAttributes(evicAttr);
+ attr.setDataPolicy(DataPolicy.REPLICATE);
+ // attr.setPartitionAttributes(new
+ // PartitionAttributesFactory().setTotalNumBuckets(1).create());
+ attr.setDiskStoreName("disk");
+ RegionFactory regionFactory = cache.createRegionFactory(attr.create());
+ partitionRegion = regionFactory.create(name);
+ } catch (IllegalStateException ex) {
+ LogWriterUtils.getLogWriter().warning("Creation caught IllegalStateException", ex);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ assertNotNull("Region " + name + " not in cache", cache.getRegion(name));
+ assertNotNull("Region ref null", partitionRegion);
+ assertTrue("Region ref claims to be destroyed", !partitionRegion.isDestroyed());
+ // Create Indexes
+ try {
+ Index index = cache.getQueryService().createIndex("idIndex",
+ "p.ID", "/" + name + " p");
+ assertNotNull(index);
+ } catch (Exception e1) {
+ e1.printStackTrace();
+ fail("Index creation failed");
+ }
+ }
+ });
+
+
- final int port = vm0.invokeInt(ConcurrentIndexInitOnOverflowRegionDUnitTest.class,
- "getCacheServerPort");
++ final int port = vm0.invoke(() -> ConcurrentIndexInitOnOverflowRegionDUnitTest.getCacheServerPort());
+ final String host0 = NetworkUtils.getServerHostName(vm0.getHost());
+
+ // Start changing the value in Region which should turn into a deadlock if
+ // the fix is not there
+ vm1.invoke(new CacheSerializableRunnable(
+ "Change value in region") {
+
+ @Override
+ public void run2() throws CacheException {
+ disconnectFromDS();
+ ClientCache clientCache = new ClientCacheFactory().addPoolServer(host0, port).create();
+
+ // Do a put in region.
+ Region r = clientCache.createClientRegionFactory(ClientRegionShortcut.PROXY).create(name);
+
+ for (int i = 0; i < 100; i++) {
+ r.put(i, new PortfolioData(i));
+ }
+ }
+ });
+
+ vm0.invoke(new CacheSerializableRunnable("Set Test Hook") {
+
+ @Override
+ public void run2() throws CacheException {
+ // Set test hook before client operation
+ assertNull(IndexManager.testHook);
+ IndexManager.testHook = new IndexManagerTestHook();
+ }
+ });
+
+ AsyncInvocation asyncInv1 = vm1.invokeAsync(new CacheSerializableRunnable("Change value in region") {
+
+ @Override
+ public void run2() throws CacheException {
+ ClientCache clientCache = ClientCacheFactory.getAnyInstance();
+
+ // Do a put in region.
+ Region r = clientCache.getRegion(name);
+
+ // Destroy one of the values.
+ clientCache.getLogger().fine("Destroying the value");
+ r.destroy(1);
+ }
+ });
+
+ AsyncInvocation asyncInv2 = vm0.invokeAsync(new CacheSerializableRunnable(
+ "Run query on region") {
+
+ @Override
+ public void run2() throws CacheException {
+ Cache cache = PRQHelp.getCache();
+
+ while (!hooked) {
+ Wait.pause(100);
+ }
+ // Create Indexes
+ try {
+ Index index = cache.getQueryService().createIndex("statusIndex",
+ "p.status", "/" + name + " p");
+ assertNotNull(index);
+ } catch (Exception e1) {
+ e1.printStackTrace();
+ fail("Index creation failed");
+ }
+ }
+ });
+
+ // If we take more than 30 seconds then its a deadlock.
+ ThreadUtils.join(asyncInv2, 30 * 1000);
+ ThreadUtils.join(asyncInv1, 30 * 1000);
+
+ vm0.invoke(new CacheSerializableRunnable("Set Test Hook") {
+
+ @Override
+ public void run2() throws CacheException {
+ assertNotNull(IndexManager.testHook);
+ IndexManager.testHook = null;
+ }
+ });
+
+ }
+
+ /**
+ * This tests if index updates are blocked while region.clear() is
+ * called and indexes are being reinitialized.
+ */
+ public void testIndexUpdateWithRegionClear() {
+
+ Host host = Host.getHost(0);
+ VM vm0 = host.getVM(0);
+
+ final String regionName = "portfolio";
+
+ hooked = false;
+
+ // Create region and an index on it
+ vm0.invoke(new CacheSerializableRunnable("Create region and index") {
+
+ @Override
+ public void run2() throws CacheException {
+ Cache cache = PRQHelp.getCache();
+ Region region = cache.createRegionFactory(RegionShortcut.LOCAL).create(regionName);
+ QueryService qService = cache.getQueryService();
+
+ try {
+ qService.createIndex("idIndex", "ID", "/"+regionName);
+ qService.createIndex("secIdIndex", "pos.secId", "/" + regionName + " p, p.positions.values pos");
+ } catch (Exception e) {
+ fail("Index creation failed." + e);
+ }
+ }
+ });
+
+ final class LocalTestHook implements TestHook {
+
+ @Override
+ public void hook(int spot) throws RuntimeException {
+ switch (spot) {
+ case 6: // processAction in IndexManager
+ hooked = true;
+ //wait untill some thread unhooks.
+ while (hooked) { Wait.pause(20); }
+ break;
+ default:
+ break;
+ }
+ }
+
+ }
+
+ // Asynch invocation for continuous index updates
+ AsyncInvocation indexUpdateAsysnch = vm0.invokeAsync(new CacheSerializableRunnable("index updates") {
+
+ @Override
+ public void run2() throws CacheException {
+
+ Region region = PRQHelp.getCache().getRegion(regionName);
+ for (int i=0; i<100; i++) {
+ if (i == 50) IndexManager.testHook = new LocalTestHook();
+ region.put(i, new Portfolio(i));
+ if (i == 50) Wait.pause(20);
+ }
+ }
+ });
+
+ // Region.clear() which should block other region updates.
+ vm0.invoke(new CacheSerializableRunnable("Clear the region") {
+
+ @Override
+ public void run2() throws CacheException {
+ Region region = PRQHelp.getCache().getRegion(regionName);
+
+ while(!hooked) {
+ Wait.pause(100);
+ }
+ if (hooked) {
+ hooked = false;
+ IndexManager.testHook = null;
+ region.clear();
+ }
+
+ try {
+ QueryService qservice = PRQHelp.getCache().getQueryService();
+ Index index = qservice.getIndex(region, "idIndex");
+ if (((CompactRangeIndex)index).getIndexStorage().size() > 1) {
+ fail("After clear region size is supposed to be zero as all index updates are blocked. Current region size is: "+ region.size());
+ }
+ } finally {
+ IndexManager.testHook = null;
+ }
+ }
+ });
+
+ // Kill asynch thread
+ ThreadUtils.join(indexUpdateAsysnch, 20000);
+
+ //Verify region size which must be 50
+ vm0.invoke(new CacheSerializableRunnable("Check region size") {
+
+ @Override
+ public void run2() throws CacheException {
+ Region region = PRQHelp.getCache().getRegion(regionName);
+ if (region.size() > 50) {
+ fail("After clear region size is supposed to be 50 as all index updates are blocked " + region.size());
+ }
+ }
+ });
+ }
+
+ public class IndexManagerTestHook implements
+ com.gemstone.gemfire.cache.query.internal.index.IndexManager.TestHook {
+ public void hook(final int spot) throws RuntimeException {
+ switch (spot) {
+ case 6: // Before Index update and after region entry lock.
+ hooked = true;
+ LogWriterUtils.getLogWriter().fine("IndexManagerTestHook is hooked.");
+ Wait.pause(10000);
+ hooked = false;
+ break;
+ default:
+ break;
+ }
+ }
+ }
+
+ private static int getCacheServerPort() {
+ return bridgeServerPort;
+ }
+}