You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2018/11/27 07:31:13 UTC

ignite git commit: IGNITE-10393: MVCC: Fixed streamer with persistence on. This closes #5497.

Repository: ignite
Updated Branches:
  refs/heads/master 25c41fa1d -> c63a60a39


IGNITE-10393: MVCC: Fixed streamer with persistence on. This closes #5497.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c63a60a3
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c63a60a3
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c63a60a3

Branch: refs/heads/master
Commit: c63a60a39d4131861c98a84440ecb8c67b10ba25
Parents: 25c41fa
Author: Igor Seliverstov <gv...@gmail.com>
Authored: Tue Nov 27 10:31:01 2018 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Tue Nov 27 10:31:01 2018 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheMapEntry.java     |  2 +-
 .../persistence/pagemem/PageMemoryImpl.java     |  2 +-
 ...aStreamProcessorMvccPersistenceSelfTest.java | 28 +++++++++
 .../DataStreamProcessorPersistenceSelfTest.java | 28 +++++++++
 .../DataStreamProcessorSelfTest.java            | 63 +++++++++++++++++++-
 .../testsuites/IgniteBinaryCacheTestSuite.java  |  2 +
 .../testsuites/IgniteCacheMvccTestSuite.java    |  2 +
 .../ignite/testsuites/IgniteCacheTestSuite.java |  2 +
 8 files changed, 126 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/c63a60a3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 5f4f974..bbdff35 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -3478,7 +3478,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                             expireTime,
                             partition(),
                             updateCntr,
-                            mvccVer
+                            mvccVer == null ? MvccUtils.INITIAL_VERSION : mvccVer
                         )));
                     } else {
                         cctx.shared().wal().log(new DataRecord(new DataEntry(

http://git-wip-us.apache.org/repos/asf/ignite/blob/c63a60a3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
index f6aa059..c4b0104 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
@@ -53,7 +53,6 @@ import org.apache.ignite.internal.pagemem.PageIdUtils;
 import org.apache.ignite.internal.pagemem.PageUtils;
 import org.apache.ignite.internal.pagemem.store.IgnitePageStoreManager;
 import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
-import org.apache.ignite.internal.processors.cache.persistence.StorageException;
 import org.apache.ignite.internal.pagemem.wal.WALIterator;
 import org.apache.ignite.internal.pagemem.wal.WALPointer;
 import org.apache.ignite.internal.pagemem.wal.record.CheckpointRecord;
@@ -65,6 +64,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.persistence.CheckpointLockStateChecker;
 import org.apache.ignite.internal.processors.cache.persistence.CheckpointWriteProgressSupplier;
 import org.apache.ignite.internal.processors.cache.persistence.DataRegionMetricsImpl;
+import org.apache.ignite.internal.processors.cache.persistence.StorageException;
 import org.apache.ignite.internal.processors.cache.persistence.freelist.io.PagesListMetaIO;
 import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
 import org.apache.ignite.internal.processors.cache.persistence.tree.io.DataPageIO;

http://git-wip-us.apache.org/repos/asf/ignite/blob/c63a60a3/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorMvccPersistenceSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorMvccPersistenceSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorMvccPersistenceSelfTest.java
new file mode 100644
index 0000000..9360cab
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorMvccPersistenceSelfTest.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.datastreamer;
+
+/**
+ *
+ */
+public class DataStreamProcessorMvccPersistenceSelfTest extends DataStreamProcessorMvccSelfTest {
+    /** {@inheritDoc} */
+    @Override public boolean persistenceEnabled() {
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/c63a60a3/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorPersistenceSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorPersistenceSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorPersistenceSelfTest.java
new file mode 100644
index 0000000..7ce4fdd
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorPersistenceSelfTest.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.datastreamer;
+
+/**
+ *
+ */
+public class DataStreamProcessorPersistenceSelfTest extends DataStreamProcessorSelfTest {
+    /** {@inheritDoc} */
+    @Override public boolean persistenceEnabled() {
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/c63a60a3/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java
index 877df2e..39f43e3 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java
@@ -42,9 +42,12 @@ import org.apache.ignite.cache.store.CacheStore;
 import org.apache.ignite.cache.store.CacheStoreAdapter;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.configuration.IgniteReflectionFactory;
 import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.configuration.WALMode;
 import org.apache.ignite.events.Event;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteKernal;
@@ -52,6 +55,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter;
 import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.internal.util.typedef.G;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteClosure;
@@ -97,6 +101,13 @@ public class DataStreamProcessorSelfTest extends GridCommonAbstractTest {
     /** */
     private TestStore store;
 
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        if (persistenceEnabled())
+            cleanPersistenceDir();
+    }
+
     /** {@inheritDoc} */
     @Override public void afterTest() throws Exception {
         super.afterTest();
@@ -104,6 +115,13 @@ public class DataStreamProcessorSelfTest extends GridCommonAbstractTest {
         useCache = false;
     }
 
+    /**
+     * @return {@code True} if persistent store is enabled for test.
+     */
+    public boolean persistenceEnabled() {
+        return false;
+    }
+
     /** {@inheritDoc} */
     @SuppressWarnings({"unchecked"})
     @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
@@ -141,6 +159,12 @@ public class DataStreamProcessorSelfTest extends GridCommonAbstractTest {
             }
 
             cfg.setCacheConfiguration(cc);
+
+            if (persistenceEnabled())
+                cfg.setDataStorageConfiguration(new DataStorageConfiguration()
+                    .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
+                            .setPersistenceEnabled(true))
+                    .setWalMode(WALMode.LOG_ONLY));
         }
         else {
             cfg.setCacheConfiguration();
@@ -225,6 +249,8 @@ public class DataStreamProcessorSelfTest extends GridCommonAbstractTest {
 
             Ignite igniteWithoutCache = startGrid(1);
 
+            afterGridStarted();
+
             final IgniteDataStreamer<Integer, Integer> ldr = igniteWithoutCache.dataStreamer(DEFAULT_CACHE_NAME);
 
             ldr.receiver(DataStreamerCacheUpdaters.<Integer, Integer>batchedSorted());
@@ -337,7 +363,7 @@ public class DataStreamProcessorSelfTest extends GridCommonAbstractTest {
             startGrid(1);
             startGrid(2);
 
-            awaitPartitionMapExchange();
+            afterGridStarted();
 
             IgniteCache<Integer, Integer> cache = grid(0).cache(DEFAULT_CACHE_NAME);
 
@@ -422,6 +448,8 @@ public class DataStreamProcessorSelfTest extends GridCommonAbstractTest {
             Ignite g1 = startGrid(1);
             startGrid(2); // Reproduced only for several nodes in topology (if marshalling is used).
 
+            afterGridStarted();
+
             List<Object> arrays = Arrays.<Object>asList(
                 new byte[] {1}, new boolean[] {true, false}, new char[] {2, 3}, new short[] {3, 4},
                 new int[] {4, 5}, new long[] {5, 6}, new float[] {6, 7}, new double[] {7, 8});
@@ -485,6 +513,8 @@ public class DataStreamProcessorSelfTest extends GridCommonAbstractTest {
 
             Ignite g1 = grid(idx - 1);
 
+            afterGridStarted();
+
             // Get and configure loader.
             final IgniteDataStreamer<Integer, Integer> ldr = g1.dataStreamer(DEFAULT_CACHE_NAME);
 
@@ -589,6 +619,8 @@ public class DataStreamProcessorSelfTest extends GridCommonAbstractTest {
         try {
             Ignite g1 = startGrid(1);
 
+            afterGridStarted();
+
             IgniteDataStreamer<Object, Object> ldr = g1.dataStreamer(DEFAULT_CACHE_NAME);
 
             ldr.close(false);
@@ -746,6 +778,8 @@ public class DataStreamProcessorSelfTest extends GridCommonAbstractTest {
         try {
             Ignite g = startGrid();
 
+            afterGridStarted();
+
             final IgniteCache<Integer, Integer> c = g.cache(DEFAULT_CACHE_NAME);
 
             final IgniteDataStreamer<Integer, Integer> ldr = g.dataStreamer(DEFAULT_CACHE_NAME);
@@ -799,6 +833,8 @@ public class DataStreamProcessorSelfTest extends GridCommonAbstractTest {
         try {
             Ignite g = startGrid();
 
+            afterGridStarted();
+
             IgniteCache<Integer, Integer> c = g.cache(DEFAULT_CACHE_NAME);
 
             IgniteDataStreamer<Integer, Integer> ldr = g.dataStreamer(DEFAULT_CACHE_NAME);
@@ -835,6 +871,8 @@ public class DataStreamProcessorSelfTest extends GridCommonAbstractTest {
         try {
             Ignite g = startGrid();
 
+            afterGridStarted();
+
             final CountDownLatch latch = new CountDownLatch(9);
 
             g.events().localListen(new IgnitePredicate<Event>() {
@@ -891,6 +929,8 @@ public class DataStreamProcessorSelfTest extends GridCommonAbstractTest {
             startGrid(2);
             startGrid(3);
 
+            afterGridStarted();
+
             for (int i = 0; i < 1000; i++)
                 storeMap.put(i, i);
 
@@ -940,6 +980,8 @@ public class DataStreamProcessorSelfTest extends GridCommonAbstractTest {
         }
         finally {
             storeMap = null;
+
+            stopAllGrids();
         }
     }
 
@@ -955,6 +997,8 @@ public class DataStreamProcessorSelfTest extends GridCommonAbstractTest {
             startGrid(2);
             startGrid(3);
 
+            afterGridStarted();
+
             try (IgniteDataStreamer<String, TestObject> ldr = ignite.dataStreamer(DEFAULT_CACHE_NAME)) {
                 ldr.allowOverwrite(true);
                 ldr.keepBinary(customKeepBinary());
@@ -988,6 +1032,8 @@ public class DataStreamProcessorSelfTest extends GridCommonAbstractTest {
 
             Ignite ignite = startGrid(1);
 
+            afterGridStarted();
+
             final IgniteCache<String, String> cache = ignite.cache(DEFAULT_CACHE_NAME);
 
             try (IgniteDataStreamer<String, String> ldr = ignite.dataStreamer(DEFAULT_CACHE_NAME)) {
@@ -1034,6 +1080,8 @@ public class DataStreamProcessorSelfTest extends GridCommonAbstractTest {
 
             Ignite client = startGrid(0);
 
+            afterGridStarted();
+
             final IgniteCache<String, String> cache = ignite.cache(DEFAULT_CACHE_NAME);
 
             try (IgniteDataStreamer<String, String> ldr = client.dataStreamer(DEFAULT_CACHE_NAME)) {
@@ -1100,6 +1148,19 @@ public class DataStreamProcessorSelfTest extends GridCommonAbstractTest {
     }
 
     /**
+     * Activates grid if necessary and wait for partition map exchange.
+     */
+    private void afterGridStarted() throws InterruptedException {
+        G.allGrids().stream()
+            .filter(g -> !g.cluster().node().isClient())
+            .findAny()
+            .filter(g -> !g.cluster().active())
+            .ifPresent(g -> g.cluster().active(true));
+
+        awaitPartitionMapExchange();
+    }
+
+    /**
      *
      */
     @SuppressWarnings("PublicInnerClass")

http://git-wip-us.apache.org/repos/asf/ignite/blob/c63a60a3/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheTestSuite.java
index 170bb33..5cfe534 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheTestSuite.java
@@ -34,6 +34,7 @@ import org.apache.ignite.internal.processors.cache.binary.distributed.dht.GridCa
 import org.apache.ignite.internal.processors.cache.binary.distributed.dht.GridCacheAtomicPartitionedOnlyBinaryMultithreadedSelfTest;
 import org.apache.ignite.internal.processors.cache.binary.distributed.dht.GridCacheBinariesNearPartitionedByteArrayValuesSelfTest;
 import org.apache.ignite.internal.processors.cache.binary.distributed.dht.GridCacheBinariesPartitionedOnlyByteArrayValuesSelfTest;
+import org.apache.ignite.internal.processors.datastreamer.DataStreamProcessorPersistenceSelfTest;
 import org.apache.ignite.internal.processors.datastreamer.DataStreamProcessorSelfTest;
 
 /**
@@ -51,6 +52,7 @@ public class IgniteBinaryCacheTestSuite extends TestSuite {
 
         // Tests below have a special version for Binary Marshaller
         ignoredTests.add(DataStreamProcessorSelfTest.class);
+        ignoredTests.add(DataStreamProcessorPersistenceSelfTest.class);
         ignoredTests.add(GridCacheAffinityRoutingSelfTest.class);
         ignoredTests.add(IgniteCacheAtomicLocalExpiryPolicyTest.class);
         ignoredTests.add(GridCacheEntryMemorySizeSelfTest.class);

http://git-wip-us.apache.org/repos/asf/ignite/blob/c63a60a3/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite.java
index d4b837c..930706d 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite.java
@@ -38,6 +38,7 @@ import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccTxFailoverTest;
 import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccVacuumTest;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccCachePeekTest;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccUnsupportedTxModesTest;
+import org.apache.ignite.internal.processors.datastreamer.DataStreamProcessorMvccPersistenceSelfTest;
 import org.apache.ignite.internal.processors.datastreamer.DataStreamProcessorMvccSelfTest;
 
 import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT;
@@ -59,6 +60,7 @@ public class IgniteCacheMvccTestSuite extends TestSuite {
         suite.addTestSuite(CacheMvccConfigurationValidationTest.class);
 
         suite.addTestSuite(DataStreamProcessorMvccSelfTest.class);
+        suite.addTestSuite(DataStreamProcessorMvccPersistenceSelfTest.class);
         suite.addTestSuite(CacheMvccOperationChecksTest.class);
 
         suite.addTestSuite(CacheMvccRemoteTxOnNearNodeStartTest.class);

http://git-wip-us.apache.org/repos/asf/ignite/blob/c63a60a3/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
index 52e2ba2..dd03ef3 100755
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
@@ -144,6 +144,7 @@ import org.apache.ignite.internal.processors.cache.distributed.replicated.GridCa
 import org.apache.ignite.internal.processors.cache.local.GridCacheLocalTxExceptionSelfTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheEntryProcessorExternalizableFailedTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheEntryProcessorNonSerializableTest;
+import org.apache.ignite.internal.processors.datastreamer.DataStreamProcessorPersistenceSelfTest;
 import org.apache.ignite.internal.processors.datastreamer.DataStreamProcessorSelfTest;
 import org.apache.ignite.internal.processors.datastreamer.DataStreamerClientReconnectAfterClusterRestartTest;
 import org.apache.ignite.internal.processors.datastreamer.DataStreamerImplSelfTest;
@@ -250,6 +251,7 @@ public class IgniteCacheTestSuite extends TestSuite {
         suite.addTestSuite(GridCacheBalancingStoreSelfTest.class);
         suite.addTestSuite(GridCacheAffinityApiSelfTest.class);
         suite.addTestSuite(GridCacheStoreValueBytesSelfTest.class);
+        GridTestUtils.addTestIfNeeded(suite, DataStreamProcessorPersistenceSelfTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, DataStreamProcessorSelfTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, DataStreamerUpdateAfterLoadTest.class, ignoredTests);
         suite.addTestSuite(DataStreamerMultiThreadedSelfTest.class);