You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by es...@apache.org on 2018/12/14 22:56:33 UTC
[geode] branch feature/GEODE-5786 updated: GEODE-5786: Create
txEntryState based on createIfAbsent condition.
This is an automated email from the ASF dual-hosted git repository.
eshu11 pushed a commit to branch feature/GEODE-5786
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/feature/GEODE-5786 by this push:
new fa89c91 GEODE-5786: Create txEntryState based on createIfAbsent condition.
fa89c91 is described below
commit fa89c91456a71094c944d9bafdc9a652fbd7259e
Author: eshu <es...@pivotal.io>
AuthorDate: Fri Dec 14 14:41:42 2018 -0800
GEODE-5786: Create txEntryState based on createIfAbsent condition.
* Do not create txEntryState for entries with tombstones during set operations - keySet, values, etc.
* Only create txEntryState for tombstones during get operation.
Co-authored-by: lgallinat
---
...erRepeatableReadTransactionDistributedTest.java | 220 +++++++++++++++++++++
.../RepeatableReadTransactionDistributedTest.java | 208 +++++++++++++++++++
.../java/org/apache/geode/TXJUnitTest.java | 97 ++++++++-
.../apache/geode/internal/cache/EntriesSet.java | 6 +-
.../geode/internal/cache/InternalDataView.java | 2 +-
.../apache/geode/internal/cache/LocalRegion.java | 2 +-
.../geode/internal/cache/LocalRegionDataView.java | 2 +-
.../internal/cache/PausedTXStateProxyImpl.java | 2 +-
.../org/apache/geode/internal/cache/TXEntry.java | 7 +-
.../org/apache/geode/internal/cache/TXState.java | 35 ++--
.../geode/internal/cache/TXStateInterface.java | 7 -
.../geode/internal/cache/TXStateProxyImpl.java | 4 +-
.../apache/geode/internal/cache/TXStateStub.java | 2 +-
13 files changed, 559 insertions(+), 35 deletions(-)
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/ClientServerRepeatableReadTransactionDistributedTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/ClientServerRepeatableReadTransactionDistributedTest.java
new file mode 100644
index 0000000..7878f3f
--- /dev/null
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/ClientServerRepeatableReadTransactionDistributedTest.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static org.apache.geode.test.dunit.VM.getHostName;
+import static org.apache.geode.test.dunit.VM.getVM;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.io.Serializable;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.geode.cache.CommitConflictException;
+import org.apache.geode.cache.PartitionAttributes;
+import org.apache.geode.cache.PartitionAttributesFactory;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.TransactionId;
+import org.apache.geode.cache.client.ClientRegionFactory;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.client.PoolFactory;
+import org.apache.geode.cache.client.PoolManager;
+import org.apache.geode.cache.client.internal.PoolImpl;
+import org.apache.geode.cache.server.CacheServer;
+import org.apache.geode.internal.cache.tier.sockets.CacheServerTestUtil;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.CacheRule;
+import org.apache.geode.test.dunit.rules.ClientCacheRule;
+import org.apache.geode.test.dunit.rules.DistributedRule;
+import org.apache.geode.test.junit.rules.serializable.SerializableTestName;
+
+public class ClientServerRepeatableReadTransactionDistributedTest implements Serializable {
+ private String hostName;
+ private String uniqueName;
+ private String regionName;
+ private VM server1;
+ private int port1;
+
+ @Rule
+ public DistributedRule distributedRule = new DistributedRule();
+
+ @Rule
+ public CacheRule cacheRule = new CacheRule();
+
+ @Rule
+ public ClientCacheRule clientCacheRule = new ClientCacheRule();
+
+ @Rule
+ public SerializableTestName testName = new SerializableTestName();
+
+ @Before
+ public void setUp() {
+ server1 = getVM(0);
+
+ hostName = getHostName();
+ uniqueName = getClass().getSimpleName() + "_" + testName.getMethodName();
+ regionName = uniqueName + "_region";
+ }
+
+ @Test
+ public void valuesRepeatableReadDoesNotIncludeTombstones() {
+ port1 = server1.invoke(() -> createServerRegion(1));
+
+ createClientRegion(port1);
+ Region region = clientCacheRule.getClientCache().getRegion(regionName);
+ region.put("key1", "value1");
+ region.destroy("key1"); // creates a tombstone
+
+ TXManagerImpl txMgr =
+ (TXManagerImpl) clientCacheRule.getClientCache().getCacheTransactionManager();
+
+ Region region1 = clientCacheRule.getClientCache().getRegion(regionName);
+ txMgr.begin(); // tx1
+ region1.values().toArray(); // this is a repeatable read
+ TransactionId txId = txMgr.suspend();
+
+ txMgr.begin(); // tx2
+ region1.put("key1", "newValue");
+ txMgr.commit();
+
+ txMgr.resume(txId);
+ region1.put("key1", "value1");
+ txMgr.commit();
+ assertThat(region.get("key1")).isEqualTo("value1");
+ }
+
+ @Test
+ public void keySetRepeatableReadDoesNotIncludeTombstones() {
+ port1 = server1.invoke(() -> createServerRegion(1));
+
+ createClientRegion(port1);
+ Region region = clientCacheRule.getClientCache().getRegion(regionName);
+ region.put("key1", "value1");
+ region.destroy("key1"); // creates a tombstone
+
+ TXManagerImpl txMgr =
+ (TXManagerImpl) clientCacheRule.getClientCache().getCacheTransactionManager();
+
+ Region region1 = clientCacheRule.getClientCache().getRegion(regionName);
+ txMgr.begin(); // tx1
+ region1.keySet().toArray(); // this is a repeatable read
+ TransactionId txId = txMgr.suspend();
+
+ txMgr.begin(); // tx2
+ region1.put("key1", "newValue");
+ txMgr.commit();
+
+ txMgr.resume(txId);
+ region1.put("key1", "value1");
+ txMgr.commit();
+ assertThat(region.get("key1")).isEqualTo("value1");
+ }
+
+ @Test
+ public void valuesRepeatableReadIncludesInvalidates() {
+ port1 = server1.invoke(() -> createServerRegion(1));
+
+ createClientRegion(port1);
+ Region region = clientCacheRule.getClientCache().getRegion(regionName);
+ region.put("key1", "value1");
+ region.invalidate("key1");
+
+ TXManagerImpl txMgr =
+ (TXManagerImpl) clientCacheRule.getClientCache().getCacheTransactionManager();
+
+ Region region1 = clientCacheRule.getClientCache().getRegion(regionName);
+ txMgr.begin(); // tx1
+ region1.values().toArray(); // this is a repeatable read
+ TransactionId txId = txMgr.suspend();
+
+ txMgr.begin(); // tx2
+ region1.put("key1", "newValue");
+ txMgr.commit();
+
+ txMgr.resume(txId);
+ region1.put("key1", "value1");
+ assertThatThrownBy(() -> txMgr.commit()).isExactlyInstanceOf(CommitConflictException.class);
+ assertThat(region.get("key1")).isEqualTo("newValue");
+ }
+
+ @Test
+ public void keySetRepeatableReadIncludesInvalidates() {
+ port1 = server1.invoke(() -> createServerRegion(1));
+
+ createClientRegion(port1);
+ Region region = clientCacheRule.getClientCache().getRegion(regionName);
+ region.put("key1", "value1");
+ region.invalidate("key1");
+
+ TXManagerImpl txMgr =
+ (TXManagerImpl) clientCacheRule.getClientCache().getCacheTransactionManager();
+
+ Region region1 = clientCacheRule.getClientCache().getRegion(regionName);
+ txMgr.begin(); // tx1
+ region1.keySet().toArray(); // this is a repeatable read
+ TransactionId txId = txMgr.suspend();
+
+ txMgr.begin(); // tx2
+ region1.put("key1", "newValue");
+ txMgr.commit();
+
+ txMgr.resume(txId);
+ region1.put("key1", "value1");
+ assertThatThrownBy(() -> txMgr.commit()).isExactlyInstanceOf(CommitConflictException.class);
+ assertThat(region.get("key1")).isEqualTo("newValue");
+ }
+
+ private int createServerRegion(int totalNumBuckets) throws Exception {
+ PartitionAttributesFactory factory = new PartitionAttributesFactory();
+ factory.setTotalNumBuckets(totalNumBuckets);
+ PartitionAttributes partitionAttributes = factory.create();
+ cacheRule.getOrCreateCache().createRegionFactory(RegionShortcut.PARTITION)
+ .setPartitionAttributes(partitionAttributes).create(regionName);
+
+ CacheServer server = cacheRule.getCache().addCacheServer();
+ server.setPort(0);
+ server.start();
+ return server.getPort();
+ }
+
+ private void createClientRegion(int port) {
+ clientCacheRule.createClientCache();
+
+ CacheServerTestUtil.disableShufflingOfEndpoints();
+ PoolImpl pool;
+ try {
+ pool = getPool(port);
+ } finally {
+ CacheServerTestUtil.enableShufflingOfEndpoints();
+ }
+
+ ClientRegionFactory crf =
+ clientCacheRule.getClientCache().createClientRegionFactory(ClientRegionShortcut.LOCAL);
+ crf.setPoolName(pool.getName());
+ crf.create(regionName);
+ }
+
+ private PoolImpl getPool(int port) {
+ PoolFactory factory = PoolManager.createFactory();
+ factory.addServer(hostName, port);
+ factory.setReadTimeout(12000).setSocketBufferSize(1000);
+ return (PoolImpl) factory.create(uniqueName);
+ }
+
+}
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/execute/RepeatableReadTransactionDistributedTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/execute/RepeatableReadTransactionDistributedTest.java
new file mode 100644
index 0000000..7bf2876
--- /dev/null
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/execute/RepeatableReadTransactionDistributedTest.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.execute;
+
+import static org.apache.geode.test.dunit.VM.getVM;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.io.Serializable;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.geode.cache.CommitConflictException;
+import org.apache.geode.cache.PartitionAttributes;
+import org.apache.geode.cache.PartitionAttributesFactory;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.TransactionId;
+import org.apache.geode.cache.server.CacheServer;
+import org.apache.geode.internal.cache.TXManagerImpl;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.CacheRule;
+import org.apache.geode.test.dunit.rules.DistributedRule;
+import org.apache.geode.test.junit.rules.serializable.SerializableTestName;
+
+public class RepeatableReadTransactionDistributedTest implements Serializable {
+ private String uniqueName;
+ private String regionName;
+ private VM server1;
+ private VM server2;
+ private String key = "key";
+ private String originalValue = "originalValue";
+ private String value1 = "value1";
+ private String value2 = "value2";
+
+ @Rule
+ public DistributedRule distributedRule = new DistributedRule();
+
+ @Rule
+ public CacheRule cacheRule = new CacheRule();
+ @Rule
+ public SerializableTestName testName = new SerializableTestName();
+
+ @Before
+ public void setUp() {
+ server1 = getVM(0);
+ server2 = getVM(1);
+
+ uniqueName = getClass().getSimpleName() + "_" + testName.getMethodName();
+ regionName = uniqueName + "_region";
+ }
+
+ @Test
+ public void valuesRepeatableReadDoesNotIncludeTombstones() {
+ server1.invoke(() -> createServerRegion(1, false));
+ server2.invoke(() -> createServerRegion(1, true));
+ server2.invoke(() -> doDestroyOps());
+ server2.invoke(() -> doValuesTransactionWithTombstone());
+ }
+
+ @Test
+ public void keySetRepeatableReadDoesNotIncludeTombstones() {
+ server1.invoke(() -> createServerRegion(1, false));
+ server2.invoke(() -> createServerRegion(1, true));
+ server2.invoke(() -> doDestroyOps());
+ server2.invoke(() -> doKeySetTransactionWithTombstone());
+ }
+
+ @Test
+ public void valuesRepeatableReadIncludesInvalidates() {
+ server1.invoke(() -> createServerRegion(1, false));
+ server2.invoke(() -> createServerRegion(1, true));
+ server2.invoke(() -> doInvalidateOps());
+ server2.invoke(() -> doValuesTransactionWithInvalidate());
+ }
+
+ @Test
+ public void keySetRepeatableReadIncludesInvalidates() {
+ server1.invoke(() -> createServerRegion(1, false));
+ server2.invoke(() -> createServerRegion(1, true));
+ server2.invoke(() -> doInvalidateOps());
+ server2.invoke(() -> doKeySetTransactionWithInvalidate());
+ }
+
+ private int createServerRegion(int totalNumBuckets, boolean isAccessor) throws Exception {
+ PartitionAttributesFactory factory = new PartitionAttributesFactory();
+ factory.setTotalNumBuckets(totalNumBuckets);
+ if (isAccessor) {
+ factory.setLocalMaxMemory(0);
+ }
+ PartitionAttributes partitionAttributes = factory.create();
+ cacheRule.getOrCreateCache().createRegionFactory(RegionShortcut.PARTITION)
+ .setPartitionAttributes(partitionAttributes).create(regionName);
+
+ CacheServer server = cacheRule.getCache().addCacheServer();
+ server.setPort(0);
+ server.start();
+ return server.getPort();
+ }
+
+ private void doDestroyOps() {
+ Region region = cacheRule.getCache().getRegion(regionName);
+ region.put(key, originalValue);
+ region.destroy(key); // creates a tombstone
+ }
+
+ private void doInvalidateOps() {
+ Region region = cacheRule.getCache().getRegion(regionName);
+ region.put(key, originalValue);
+ region.invalidate(key);
+ }
+
+ private void doValuesTransactionWithTombstone() {
+ TXManagerImpl txMgr =
+ (TXManagerImpl) cacheRule.getCache().getCacheTransactionManager();
+
+ Region region = cacheRule.getCache().getRegion(regionName);
+ txMgr.begin(); // tx1
+ region.put("key2", "someValue");
+ region.values().toArray();
+ TransactionId txId = txMgr.suspend();
+
+ txMgr.begin(); // tx2
+ region.put(key, value2);
+ txMgr.commit();
+
+ txMgr.resume(txId);
+ region.put(key, value1);
+ txMgr.commit();
+
+ assertThat(region.get(key)).isEqualTo(value1);
+ }
+
+ private void doKeySetTransactionWithTombstone() {
+ TXManagerImpl txMgr =
+ (TXManagerImpl) cacheRule.getCache().getCacheTransactionManager();
+
+ Region region = cacheRule.getCache().getRegion(regionName);
+ txMgr.begin(); // tx1
+ region.put("key2", "someValue");
+ region.keySet().toArray();
+ TransactionId txId = txMgr.suspend();
+
+ txMgr.begin(); // tx2
+ region.put(key, value2);
+ txMgr.commit();
+
+ txMgr.resume(txId);
+ region.put(key, value1);
+ txMgr.commit();
+
+ assertThat(region.get(key)).isEqualTo(value1);
+ }
+
+ private void doValuesTransactionWithInvalidate() {
+ TXManagerImpl txMgr =
+ (TXManagerImpl) cacheRule.getCache().getCacheTransactionManager();
+
+ Region region = cacheRule.getCache().getRegion(regionName);
+ txMgr.begin(); // tx1
+ region.put("key2", "someValue");
+ region.values().toArray();
+ TransactionId txId = txMgr.suspend();
+
+ txMgr.begin(); // tx2
+ region.put(key, value2);
+ txMgr.commit();
+
+ txMgr.resume(txId);
+ region.put(key, value1);
+ assertThatThrownBy(() -> txMgr.commit()).isExactlyInstanceOf(CommitConflictException.class);
+ assertThat(region.get(key)).isEqualTo(value2);
+ }
+
+ private void doKeySetTransactionWithInvalidate() {
+ TXManagerImpl txMgr =
+ (TXManagerImpl) cacheRule.getCache().getCacheTransactionManager();
+
+ Region region = cacheRule.getCache().getRegion(regionName);
+ txMgr.begin(); // tx1
+ region.put("key2", "someValue");
+ region.keySet().toArray();
+ TransactionId txId = txMgr.suspend();
+
+ txMgr.begin(); // tx2
+ region.put(key, value2);
+ txMgr.commit();
+
+ txMgr.resume(txId);
+ region.put(key, value1);
+ assertThatThrownBy(() -> txMgr.commit()).isExactlyInstanceOf(CommitConflictException.class);
+ assertThat(region.get(key)).isEqualTo(value2);
+ }
+}
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/TXJUnitTest.java b/geode-core/src/integrationTest/java/org/apache/geode/TXJUnitTest.java
index 069e913..408cc6b 100644
--- a/geode-core/src/integrationTest/java/org/apache/geode/TXJUnitTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/TXJUnitTest.java
@@ -15,6 +15,7 @@
package org.apache.geode;
import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -142,12 +143,22 @@ public class TXJUnitTest {
}
protected void createRegion() throws Exception {
+ region = createRegion(getClass().getSimpleName(), false);
+ }
+
+ protected Region createRegion(String regionName, boolean isConcurrencyChecksEnabled)
+ throws Exception {
AttributesFactory<String, String> attributesFactory = new AttributesFactory<>();
- attributesFactory.setScope(Scope.DISTRIBUTED_NO_ACK);
- attributesFactory.setConcurrencyChecksEnabled(false); // test validation expects this behavior
+ attributesFactory
+ .setDataPolicy(isConcurrencyChecksEnabled ? DataPolicy.REPLICATE : DataPolicy.NORMAL);
+ attributesFactory
+ .setScope(isConcurrencyChecksEnabled ? Scope.DISTRIBUTED_ACK : Scope.DISTRIBUTED_NO_ACK);
+ attributesFactory.setConcurrencyChecksEnabled(isConcurrencyChecksEnabled); // test validation
+ // expects this
+ // behavior
attributesFactory.setIndexMaintenanceSynchronous(true);
- this.region = this.cache.createRegion(getClass().getSimpleName(), attributesFactory.create());
+ return this.cache.createRegion(regionName, attributesFactory.create());
}
protected void closeCache() {
@@ -5100,6 +5111,86 @@ public class TXJUnitTest {
}
@Test
+ public void valuesRepeatableReadDoesNotIncludeTombstones() throws Exception {
+ Region newRegion = createRegion("newRegion", true);
+ newRegion.put("key1", "value1");
+ newRegion.destroy("key1"); // creates a tombstone
+
+ txMgr.begin(); // tx1
+ newRegion.values().toArray(); // this is a repeatable read, does not read tombstone
+ TransactionId txId = txMgr.suspend();
+
+ txMgr.begin(); // tx2
+ newRegion.put("key1", "newValue");
+ txMgr.commit();
+
+ txMgr.resume(txId);
+ newRegion.put("key1", "value1");
+ txMgr.commit();
+ assertThat(newRegion.get("key1")).isEqualTo("value1");
+ }
+
+ @Test
+ public void keySetRepeatableReadDoesNotIncludeTombstones() throws Exception {
+ Region newRegion = createRegion("newRegion", true);
+ newRegion.put("key1", "value1");
+ newRegion.destroy("key1"); // creates a tombstone
+
+ txMgr.begin(); // tx1
+ newRegion.keySet().toArray(); // this is a repeatable read, does not read tombstone
+ TransactionId txId = txMgr.suspend();
+
+ txMgr.begin(); // tx2
+ newRegion.put("key1", "newValue");
+ txMgr.commit();
+
+ txMgr.resume(txId);
+ newRegion.put("key1", "value1");
+ txMgr.commit();
+ assertThat(newRegion.get("key1")).isEqualTo("value1");
+ }
+
+ @Test
+ public void valuesRepeatableReadIncludesInvalidates() throws Exception {
+ Region newRegion = createRegion("newRegion", true);
+ newRegion.put("key1", "value1");
+ newRegion.invalidate("key1");
+
+ txMgr.begin(); // tx1
+ newRegion.values().toArray(); // this is a repeatable read, reads invalidate
+ TransactionId txId = txMgr.suspend();
+
+ txMgr.begin(); // tx2
+ newRegion.put("key1", "newValue");
+ txMgr.commit();
+
+ txMgr.resume(txId);
+ newRegion.put("key1", "value1");
+ assertThatThrownBy(() -> txMgr.commit()).isExactlyInstanceOf(CommitConflictException.class);
+ assertThat(newRegion.get("key1")).isEqualTo("newValue");
+ }
+
+ @Test
+ public void keySetRepeatableReadIncludesInvalidates() throws Exception {
+ Region newRegion = createRegion("newRegion", true);
+ newRegion.put("key1", "value1");
+ newRegion.invalidate("key1");
+
+ txMgr.begin(); // tx1
+ newRegion.keySet().toArray(); // this is a repeatable read, reads invalidate
+ TransactionId txId = txMgr.suspend();
+
+ txMgr.begin(); // tx2
+ newRegion.put("key1", "newValue");
+ txMgr.commit();
+
+ txMgr.resume(txId);
+ newRegion.put("key1", "value1");
+ assertThatThrownBy(() -> txMgr.commit()).isExactlyInstanceOf(CommitConflictException.class);
+ assertThat(newRegion.get("key1")).isEqualTo("newValue");
+ }
+
+ @Test
public void testRepeatableRead() throws CacheException {
final TXManagerImpl txMgrImpl = (TXManagerImpl) this.txMgr;
TXStateProxy tx;
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/EntriesSet.java b/geode-core/src/main/java/org/apache/geode/internal/cache/EntriesSet.java
index 0e169d2..7c9eecf 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/EntriesSet.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/EntriesSet.java
@@ -178,7 +178,11 @@ public class EntriesSet extends AbstractSet {
} else if (ignoreCopyOnReadForQuery) {
result = ((NonTXEntry) re).getValue(true);
} else {
- result = re.getValue();
+ if ((re instanceof TXEntry)) {
+ result = ((TXEntry) re).getValue(allowTombstones);
+ } else {
+ result = re.getValue();
+ }
}
if (result != null && !Token.isInvalidOrRemoved(result)) { // fix for bug 34583
return result;
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/InternalDataView.java b/geode-core/src/main/java/org/apache/geode/internal/cache/InternalDataView.java
index e658402..fef8dfa 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/InternalDataView.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/InternalDataView.java
@@ -42,7 +42,7 @@ public interface InternalDataView {
@Retained
Object getDeserializedValue(KeyInfo keyInfo, LocalRegion localRegion, boolean updateStats,
boolean disableCopyOnRead, boolean preferCD, EntryEventImpl clientEvent,
- boolean returnTombstones, boolean retainResult);
+ boolean returnTombstones, boolean retainResult, boolean createIfAbsent);
/**
* @param expectedOldValue TODO
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
index 488cd58..d6d4395 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
@@ -1399,7 +1399,7 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
try {
KeyInfo keyInfo = getKeyInfo(key, aCallbackArgument);
Object value = getDataView().getDeserializedValue(keyInfo, this, true, disableCopyOnRead,
- preferCD, clientEvent, returnTombstones, retainResult);
+ preferCD, clientEvent, returnTombstones, retainResult, true);
final boolean isCreate = value == null;
isMiss = value == null || Token.isInvalid(value)
|| (!returnTombstones && value == Token.TOMBSTONE);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegionDataView.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegionDataView.java
index 6779574..4761c1b 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegionDataView.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegionDataView.java
@@ -39,7 +39,7 @@ public class LocalRegionDataView implements InternalDataView {
*/
public Object getDeserializedValue(KeyInfo keyInfo, LocalRegion localRegion, boolean updateStats,
boolean disableCopyOnRead, boolean preferCD, EntryEventImpl clientEvent,
- boolean returnTombstones, boolean retainResult) {
+ boolean returnTombstones, boolean retainResult, boolean createIfAbsent) {
return localRegion.getDeserializedValue(null, keyInfo, updateStats, disableCopyOnRead, preferCD,
clientEvent, returnTombstones, retainResult);
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PausedTXStateProxyImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PausedTXStateProxyImpl.java
index 743780c..3c01f84 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PausedTXStateProxyImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PausedTXStateProxyImpl.java
@@ -110,7 +110,7 @@ public class PausedTXStateProxyImpl implements TXStateProxy {
@Override
public Object getDeserializedValue(KeyInfo keyInfo, LocalRegion localRegion, boolean updateStats,
boolean disableCopyOnRead, boolean preferCD, EntryEventImpl clientEvent,
- boolean returnTombstones, boolean retainResult) {
+ boolean returnTombstones, boolean retainResult, boolean createIfAbsent) {
return null;
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXEntry.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXEntry.java
index 4f2bf8a..217e527 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXEntry.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXEntry.java
@@ -89,12 +89,17 @@ public class TXEntry implements Region.Entry {
@Unretained
public Object getValue() {
+ return getValue(true);
+ }
+
+ @Unretained
+ public Object getValue(boolean createIfAbsent) {
checkTX();
// Object value = this.localRegion.getDeserialized(this.key, false, this.myTX,
// this.rememberReads);
@Unretained
Object value = this.myTX.getDeserializedValue(keyInfo, this.localRegion, false, false, false,
- null, false, false);
+ null, false, false, createIfAbsent);
if (value == null) {
throw new EntryDestroyedException(this.keyInfo.getKey().toString());
} else if (Token.isInvalid(value)) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
index e5e09e0..d074857 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
@@ -1587,8 +1587,8 @@ public class TXState implements TXStateInterface {
@Override
public Object getDeserializedValue(KeyInfo keyInfo, LocalRegion localRegion, boolean updateStats,
boolean disableCopyOnRead, boolean preferCD, EntryEventImpl clientEvent,
- boolean returnTombstones, boolean retainResult) {
- TXEntryState tx = txReadEntry(keyInfo, localRegion, true, true/* create txEntry is absent */);
+ boolean returnTombstones, boolean retainResult, boolean createIfAbsent) {
+ TXEntryState tx = txReadEntry(keyInfo, localRegion, true, createIfAbsent);
if (tx != null) {
Object v = tx.getValue(keyInfo, localRegion, preferCD);
if (!disableCopyOnRead) {
@@ -1739,18 +1739,18 @@ public class TXState implements TXStateInterface {
preferCD, requestingClient, clientEvent, returnTombstones);
}
- private boolean readEntryAndCheckIfDestroyed(KeyInfo keyInfo, LocalRegion localRegion,
- boolean rememberReads) {
- TXEntryState tx =
- txReadEntry(keyInfo, localRegion, rememberReads, true/* create txEntry is absent */);
- if (tx != null) {
- if (!tx.existsLocally()) {
+ private TXEntryState readEntryAndCheckIfDestroyed(KeyInfo keyInfo, LocalRegion localRegion,
+ boolean rememberReads, boolean createIfAbsent) {
+ TXEntryState txEntryState =
+ txReadEntry(keyInfo, localRegion, rememberReads, createIfAbsent);
+ if (txEntryState != null) {
+ if (!txEntryState.existsLocally()) {
// It was destroyed by the transaction so skip
// this key and try the next one
- return true; // fix for bug 34583
+ return null; // fix for bug 34583
}
}
- return false;
+ return txEntryState;
}
/*
@@ -1776,14 +1776,15 @@ public class TXState implements TXStateInterface {
}
}
}
- if (!readEntryAndCheckIfDestroyed(curr, currRgn, rememberReads)) {
+ TXEntryState txEntryState =
+ readEntryAndCheckIfDestroyed(curr, currRgn, rememberReads, allowTombstones);
+ if (txEntryState != null) {
// need to create KeyInfo since higher level iterator may reuse KeyInfo
return new TXEntry(currRgn,
new KeyInfo(curr.getKey(), curr.getCallbackArg(), curr.getBucketId()), proxy,
rememberReads);
- } else {
- return null;
}
+ return null;
}
/*
@@ -1796,11 +1797,13 @@ public class TXState implements TXStateInterface {
public Object getKeyForIterator(KeyInfo curr, LocalRegion currRgn, boolean rememberReads,
boolean allowTombstones) {
assert !(curr.getKey() instanceof RegionEntry);
- if (!readEntryAndCheckIfDestroyed(curr, currRgn, rememberReads)) {
+ TXEntryState txEntryState =
+ readEntryAndCheckIfDestroyed(curr, currRgn, rememberReads, allowTombstones);
+ if (txEntryState != null) {
+ // txEntry is created/read into txState.
return curr.getKey();
- } else {
- return null;
}
+ return null;
}
/*
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateInterface.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateInterface.java
index 21784da..0e56e2d 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateInterface.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateInterface.java
@@ -110,13 +110,6 @@ public interface TXStateInterface extends Synchronization, InternalDataView {
*/
Entry getEntry(final KeyInfo keyInfo, final LocalRegion region, boolean allowTombstones);
- /**
- * @param updateStats TODO
- */
- Object getDeserializedValue(KeyInfo keyInfo, LocalRegion localRegion, boolean updateStats,
- boolean disableCopyOnRead, boolean preferCD, EntryEventImpl clientEvent,
- boolean returnTombstones, boolean retainResult);
-
TXEvent getEvent();
TXRegionState txWriteRegion(final InternalRegion internalRegion, final KeyInfo entryKey);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateProxyImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateProxyImpl.java
index 645ba36..a8c53b4 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateProxyImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateProxyImpl.java
@@ -290,9 +290,9 @@ public class TXStateProxyImpl implements TXStateProxy {
@Override
public Object getDeserializedValue(KeyInfo keyInfo, LocalRegion localRegion, boolean updateStats,
boolean disableCopyOnRead, boolean preferCD, EntryEventImpl clientEvent,
- boolean returnTombstones, boolean retainResult) {
+ boolean returnTombstones, boolean retainResult, boolean createIfAbsent) {
Object val = getRealDeal(keyInfo, localRegion).getDeserializedValue(keyInfo, localRegion,
- updateStats, disableCopyOnRead, preferCD, null, false, retainResult);
+ updateStats, disableCopyOnRead, preferCD, null, false, retainResult, createIfAbsent);
if (val != null) {
// fixes bug 51057: TXStateStub on client always returns null, so do not increment
// the operation count it will be incremented in findObject()
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateStub.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateStub.java
index 9e3e738..d34a416 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateStub.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateStub.java
@@ -189,7 +189,7 @@ public abstract class TXStateStub implements TXStateInterface {
@Override
public Object getDeserializedValue(KeyInfo keyInfo, LocalRegion localRegion, boolean updateStats,
boolean disableCopyOnRead, boolean preferCD, EntryEventImpl clientEvent,
- boolean returnTombstones, boolean retainResult) {
+ boolean returnTombstones, boolean retainResult, boolean createIfAbsent) {
// We never have a local value if we are a stub...
return null;
}