You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2018/11/27 07:31:13 UTC
ignite git commit: IGNITE-10393: MVCC: Fixed streamer with
persistence on. This closes #5497.
Repository: ignite
Updated Branches:
refs/heads/master 25c41fa1d -> c63a60a39
IGNITE-10393: MVCC: Fixed streamer with persistence on. This closes #5497.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c63a60a3
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c63a60a3
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c63a60a3
Branch: refs/heads/master
Commit: c63a60a39d4131861c98a84440ecb8c67b10ba25
Parents: 25c41fa
Author: Igor Seliverstov <gv...@gmail.com>
Authored: Tue Nov 27 10:31:01 2018 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Tue Nov 27 10:31:01 2018 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheMapEntry.java | 2 +-
.../persistence/pagemem/PageMemoryImpl.java | 2 +-
...aStreamProcessorMvccPersistenceSelfTest.java | 28 +++++++++
.../DataStreamProcessorPersistenceSelfTest.java | 28 +++++++++
.../DataStreamProcessorSelfTest.java | 63 +++++++++++++++++++-
.../testsuites/IgniteBinaryCacheTestSuite.java | 2 +
.../testsuites/IgniteCacheMvccTestSuite.java | 2 +
.../ignite/testsuites/IgniteCacheTestSuite.java | 2 +
8 files changed, 126 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/c63a60a3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 5f4f974..bbdff35 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -3478,7 +3478,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
expireTime,
partition(),
updateCntr,
- mvccVer
+ mvccVer == null ? MvccUtils.INITIAL_VERSION : mvccVer
)));
} else {
cctx.shared().wal().log(new DataRecord(new DataEntry(
http://git-wip-us.apache.org/repos/asf/ignite/blob/c63a60a3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
index f6aa059..c4b0104 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
@@ -53,7 +53,6 @@ import org.apache.ignite.internal.pagemem.PageIdUtils;
import org.apache.ignite.internal.pagemem.PageUtils;
import org.apache.ignite.internal.pagemem.store.IgnitePageStoreManager;
import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
-import org.apache.ignite.internal.processors.cache.persistence.StorageException;
import org.apache.ignite.internal.pagemem.wal.WALIterator;
import org.apache.ignite.internal.pagemem.wal.WALPointer;
import org.apache.ignite.internal.pagemem.wal.record.CheckpointRecord;
@@ -65,6 +64,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.persistence.CheckpointLockStateChecker;
import org.apache.ignite.internal.processors.cache.persistence.CheckpointWriteProgressSupplier;
import org.apache.ignite.internal.processors.cache.persistence.DataRegionMetricsImpl;
+import org.apache.ignite.internal.processors.cache.persistence.StorageException;
import org.apache.ignite.internal.processors.cache.persistence.freelist.io.PagesListMetaIO;
import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.DataPageIO;
http://git-wip-us.apache.org/repos/asf/ignite/blob/c63a60a3/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorMvccPersistenceSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorMvccPersistenceSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorMvccPersistenceSelfTest.java
new file mode 100644
index 0000000..9360cab
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorMvccPersistenceSelfTest.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.datastreamer;
+
+/**
+ *
+ */
+public class DataStreamProcessorMvccPersistenceSelfTest extends DataStreamProcessorMvccSelfTest {
+ /** {@inheritDoc} */
+ @Override public boolean persistenceEnabled() {
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c63a60a3/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorPersistenceSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorPersistenceSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorPersistenceSelfTest.java
new file mode 100644
index 0000000..7ce4fdd
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorPersistenceSelfTest.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.datastreamer;
+
+/**
+ *
+ */
+public class DataStreamProcessorPersistenceSelfTest extends DataStreamProcessorSelfTest {
+ /** {@inheritDoc} */
+ @Override public boolean persistenceEnabled() {
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c63a60a3/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java
index 877df2e..39f43e3 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java
@@ -42,9 +42,12 @@ import org.apache.ignite.cache.store.CacheStore;
import org.apache.ignite.cache.store.CacheStoreAdapter;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.IgniteReflectionFactory;
import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.configuration.WALMode;
import org.apache.ignite.events.Event;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteKernal;
@@ -52,6 +55,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteClosure;
@@ -97,6 +101,13 @@ public class DataStreamProcessorSelfTest extends GridCommonAbstractTest {
/** */
private TestStore store;
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ if (persistenceEnabled())
+ cleanPersistenceDir();
+ }
+
/** {@inheritDoc} */
@Override public void afterTest() throws Exception {
super.afterTest();
@@ -104,6 +115,13 @@ public class DataStreamProcessorSelfTest extends GridCommonAbstractTest {
useCache = false;
}
+ /**
+ * @return {@code True} if persistent store is enabled for test.
+ */
+ public boolean persistenceEnabled() {
+ return false;
+ }
+
/** {@inheritDoc} */
@SuppressWarnings({"unchecked"})
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
@@ -141,6 +159,12 @@ public class DataStreamProcessorSelfTest extends GridCommonAbstractTest {
}
cfg.setCacheConfiguration(cc);
+
+ if (persistenceEnabled())
+ cfg.setDataStorageConfiguration(new DataStorageConfiguration()
+ .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
+ .setPersistenceEnabled(true))
+ .setWalMode(WALMode.LOG_ONLY));
}
else {
cfg.setCacheConfiguration();
@@ -225,6 +249,8 @@ public class DataStreamProcessorSelfTest extends GridCommonAbstractTest {
Ignite igniteWithoutCache = startGrid(1);
+ afterGridStarted();
+
final IgniteDataStreamer<Integer, Integer> ldr = igniteWithoutCache.dataStreamer(DEFAULT_CACHE_NAME);
ldr.receiver(DataStreamerCacheUpdaters.<Integer, Integer>batchedSorted());
@@ -337,7 +363,7 @@ public class DataStreamProcessorSelfTest extends GridCommonAbstractTest {
startGrid(1);
startGrid(2);
- awaitPartitionMapExchange();
+ afterGridStarted();
IgniteCache<Integer, Integer> cache = grid(0).cache(DEFAULT_CACHE_NAME);
@@ -422,6 +448,8 @@ public class DataStreamProcessorSelfTest extends GridCommonAbstractTest {
Ignite g1 = startGrid(1);
startGrid(2); // Reproduced only for several nodes in topology (if marshalling is used).
+ afterGridStarted();
+
List<Object> arrays = Arrays.<Object>asList(
new byte[] {1}, new boolean[] {true, false}, new char[] {2, 3}, new short[] {3, 4},
new int[] {4, 5}, new long[] {5, 6}, new float[] {6, 7}, new double[] {7, 8});
@@ -485,6 +513,8 @@ public class DataStreamProcessorSelfTest extends GridCommonAbstractTest {
Ignite g1 = grid(idx - 1);
+ afterGridStarted();
+
// Get and configure loader.
final IgniteDataStreamer<Integer, Integer> ldr = g1.dataStreamer(DEFAULT_CACHE_NAME);
@@ -589,6 +619,8 @@ public class DataStreamProcessorSelfTest extends GridCommonAbstractTest {
try {
Ignite g1 = startGrid(1);
+ afterGridStarted();
+
IgniteDataStreamer<Object, Object> ldr = g1.dataStreamer(DEFAULT_CACHE_NAME);
ldr.close(false);
@@ -746,6 +778,8 @@ public class DataStreamProcessorSelfTest extends GridCommonAbstractTest {
try {
Ignite g = startGrid();
+ afterGridStarted();
+
final IgniteCache<Integer, Integer> c = g.cache(DEFAULT_CACHE_NAME);
final IgniteDataStreamer<Integer, Integer> ldr = g.dataStreamer(DEFAULT_CACHE_NAME);
@@ -799,6 +833,8 @@ public class DataStreamProcessorSelfTest extends GridCommonAbstractTest {
try {
Ignite g = startGrid();
+ afterGridStarted();
+
IgniteCache<Integer, Integer> c = g.cache(DEFAULT_CACHE_NAME);
IgniteDataStreamer<Integer, Integer> ldr = g.dataStreamer(DEFAULT_CACHE_NAME);
@@ -835,6 +871,8 @@ public class DataStreamProcessorSelfTest extends GridCommonAbstractTest {
try {
Ignite g = startGrid();
+ afterGridStarted();
+
final CountDownLatch latch = new CountDownLatch(9);
g.events().localListen(new IgnitePredicate<Event>() {
@@ -891,6 +929,8 @@ public class DataStreamProcessorSelfTest extends GridCommonAbstractTest {
startGrid(2);
startGrid(3);
+ afterGridStarted();
+
for (int i = 0; i < 1000; i++)
storeMap.put(i, i);
@@ -940,6 +980,8 @@ public class DataStreamProcessorSelfTest extends GridCommonAbstractTest {
}
finally {
storeMap = null;
+
+ stopAllGrids();
}
}
@@ -955,6 +997,8 @@ public class DataStreamProcessorSelfTest extends GridCommonAbstractTest {
startGrid(2);
startGrid(3);
+ afterGridStarted();
+
try (IgniteDataStreamer<String, TestObject> ldr = ignite.dataStreamer(DEFAULT_CACHE_NAME)) {
ldr.allowOverwrite(true);
ldr.keepBinary(customKeepBinary());
@@ -988,6 +1032,8 @@ public class DataStreamProcessorSelfTest extends GridCommonAbstractTest {
Ignite ignite = startGrid(1);
+ afterGridStarted();
+
final IgniteCache<String, String> cache = ignite.cache(DEFAULT_CACHE_NAME);
try (IgniteDataStreamer<String, String> ldr = ignite.dataStreamer(DEFAULT_CACHE_NAME)) {
@@ -1034,6 +1080,8 @@ public class DataStreamProcessorSelfTest extends GridCommonAbstractTest {
Ignite client = startGrid(0);
+ afterGridStarted();
+
final IgniteCache<String, String> cache = ignite.cache(DEFAULT_CACHE_NAME);
try (IgniteDataStreamer<String, String> ldr = client.dataStreamer(DEFAULT_CACHE_NAME)) {
@@ -1100,6 +1148,19 @@ public class DataStreamProcessorSelfTest extends GridCommonAbstractTest {
}
/**
+ * Activates grid if necessary and wait for partition map exchange.
+ */
+ private void afterGridStarted() throws InterruptedException {
+ G.allGrids().stream()
+ .filter(g -> !g.cluster().node().isClient())
+ .findAny()
+ .filter(g -> !g.cluster().active())
+ .ifPresent(g -> g.cluster().active(true));
+
+ awaitPartitionMapExchange();
+ }
+
+ /**
*
*/
@SuppressWarnings("PublicInnerClass")
http://git-wip-us.apache.org/repos/asf/ignite/blob/c63a60a3/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheTestSuite.java
index 170bb33..5cfe534 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheTestSuite.java
@@ -34,6 +34,7 @@ import org.apache.ignite.internal.processors.cache.binary.distributed.dht.GridCa
import org.apache.ignite.internal.processors.cache.binary.distributed.dht.GridCacheAtomicPartitionedOnlyBinaryMultithreadedSelfTest;
import org.apache.ignite.internal.processors.cache.binary.distributed.dht.GridCacheBinariesNearPartitionedByteArrayValuesSelfTest;
import org.apache.ignite.internal.processors.cache.binary.distributed.dht.GridCacheBinariesPartitionedOnlyByteArrayValuesSelfTest;
+import org.apache.ignite.internal.processors.datastreamer.DataStreamProcessorPersistenceSelfTest;
import org.apache.ignite.internal.processors.datastreamer.DataStreamProcessorSelfTest;
/**
@@ -51,6 +52,7 @@ public class IgniteBinaryCacheTestSuite extends TestSuite {
// Tests below have a special version for Binary Marshaller
ignoredTests.add(DataStreamProcessorSelfTest.class);
+ ignoredTests.add(DataStreamProcessorPersistenceSelfTest.class);
ignoredTests.add(GridCacheAffinityRoutingSelfTest.class);
ignoredTests.add(IgniteCacheAtomicLocalExpiryPolicyTest.class);
ignoredTests.add(GridCacheEntryMemorySizeSelfTest.class);
http://git-wip-us.apache.org/repos/asf/ignite/blob/c63a60a3/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite.java
index d4b837c..930706d 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite.java
@@ -38,6 +38,7 @@ import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccTxFailoverTest;
import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccVacuumTest;
import org.apache.ignite.internal.processors.cache.mvcc.MvccCachePeekTest;
import org.apache.ignite.internal.processors.cache.mvcc.MvccUnsupportedTxModesTest;
+import org.apache.ignite.internal.processors.datastreamer.DataStreamProcessorMvccPersistenceSelfTest;
import org.apache.ignite.internal.processors.datastreamer.DataStreamProcessorMvccSelfTest;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT;
@@ -59,6 +60,7 @@ public class IgniteCacheMvccTestSuite extends TestSuite {
suite.addTestSuite(CacheMvccConfigurationValidationTest.class);
suite.addTestSuite(DataStreamProcessorMvccSelfTest.class);
+ suite.addTestSuite(DataStreamProcessorMvccPersistenceSelfTest.class);
suite.addTestSuite(CacheMvccOperationChecksTest.class);
suite.addTestSuite(CacheMvccRemoteTxOnNearNodeStartTest.class);
http://git-wip-us.apache.org/repos/asf/ignite/blob/c63a60a3/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
index 52e2ba2..dd03ef3 100755
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
@@ -144,6 +144,7 @@ import org.apache.ignite.internal.processors.cache.distributed.replicated.GridCa
import org.apache.ignite.internal.processors.cache.local.GridCacheLocalTxExceptionSelfTest;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheEntryProcessorExternalizableFailedTest;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheEntryProcessorNonSerializableTest;
+import org.apache.ignite.internal.processors.datastreamer.DataStreamProcessorPersistenceSelfTest;
import org.apache.ignite.internal.processors.datastreamer.DataStreamProcessorSelfTest;
import org.apache.ignite.internal.processors.datastreamer.DataStreamerClientReconnectAfterClusterRestartTest;
import org.apache.ignite.internal.processors.datastreamer.DataStreamerImplSelfTest;
@@ -250,6 +251,7 @@ public class IgniteCacheTestSuite extends TestSuite {
suite.addTestSuite(GridCacheBalancingStoreSelfTest.class);
suite.addTestSuite(GridCacheAffinityApiSelfTest.class);
suite.addTestSuite(GridCacheStoreValueBytesSelfTest.class);
+ GridTestUtils.addTestIfNeeded(suite, DataStreamProcessorPersistenceSelfTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, DataStreamProcessorSelfTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, DataStreamerUpdateAfterLoadTest.class, ignoredTests);
suite.addTestSuite(DataStreamerMultiThreadedSelfTest.class);