You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2021/05/14 12:27:15 UTC

[GitHub] [ignite] anton-vinogradov commented on a change in pull request #9054: [WIP] IGNITE-CDC

anton-vinogradov commented on a change in pull request #9054:
URL: https://github.com/apache/ignite/pull/9054#discussion_r632461046



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
##########
@@ -3453,9 +3489,9 @@ private TimeoutRollover(long endTime) {
                             new Time(U.currentTimeMillis()).toString() + ")");
                     }
 
-                    checkWalRolloverRequiredDuringInactivityPeriod();
+                    checkWalRolloverRequired();

Review comment:
       how about `checkWalFlushAndRolloverRequired()` ?

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
##########
@@ -774,25 +793,38 @@ private void scheduleNextInactivityPeriodElapsedCheck() {
     }
 
     /**
-     * Checks if there was elapsed significant period of inactivity. If WAL auto-archive is enabled using
-     * {@link #walAutoArchiveAfterInactivity} > 0 this method will activate roll over by timeout.<br>
+     * Checks if there was elapsed significant period of inactivity or force archive timeout.
+     * If WAL auto-archive is enabled using {@link #walAutoArchiveAfterInactivity} > 0 or {@link #walForceArchiveTimeout}
+     * this method will activate roll over by timeout.
      */
-    private void checkWalRolloverRequiredDuringInactivityPeriod() {
-        if (walAutoArchiveAfterInactivity <= 0)
-            return; // feature not configured, nothing to do
+    private void checkWalRolloverRequired() {
+        if (walAutoArchiveAfterInactivity <= 0 && walForceArchiveTimeout <= 0)
+            return; // feature not configured, nothing to do.
 
         final long lastRecMs = lastRecordLoggedMs.get();
 
         if (lastRecMs == 0)
-            return; //no records were logged to current segment, does not consider inactivity
+            return; //no records were logged to current segment, does not consider inactivity.
 
-        final long elapsedMs = U.currentTimeMillis() - lastRecMs;
+        if (walForceArchiveTimeout > 0) {
+            final long lastRollover = lastRolloverMs.get();
+            final long elapsedMs = U.currentTimeMillis() - lastRollover;
 
-        if (elapsedMs <= walAutoArchiveAfterInactivity)
-            return; // not enough time elapsed since last write
+            if (elapsedMs < walForceArchiveTimeout)
+                return; // not enough time elapsed since last rollover.
 
-        if (!lastRecordLoggedMs.compareAndSet(lastRecMs, 0))
-            return; // record write occurred concurrently
+            if (!lastRolloverMs.compareAndSet(lastRollover, 0))
+                return; // record write occurred concurrently.
+        }
+        else {
+            final long elapsedMs = U.currentTimeMillis() - lastRecMs;
+
+            if (elapsedMs <= walAutoArchiveAfterInactivity)
+                return; // not enough time elapsed since last write.
+
+            if (!lastRecordLoggedMs.compareAndSet(lastRecMs, 0))
+                return; // record write occurred concurrently.
+        }

Review comment:
       looks like duplicated code, could we simplify this?

##########
File path: modules/platforms/dotnet/Apache.Ignite.Core/Configuration/DataStorageConfiguration.cs
##########
@@ -241,6 +242,7 @@ internal DataStorageConfiguration(IBinaryRawReader reader)
             PageSize = reader.ReadInt();
             ConcurrencyLevel = reader.ReadInt();
             WalAutoArchiveAfterInactivity = reader.ReadLongAsTimespan();
+            WalForceArchiveTimeout = reader.ReadLongAsTimespan();

Review comment:
       Must be relocated as latest read.

##########
File path: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/IgniteWalReaderTest.java
##########
@@ -342,6 +365,43 @@ private boolean checkWhetherWALRelatedEventFired(int evtType) throws Exception {
         return evtRecorded.get();
     }
 
+    /**
+     * Tests force time out based WAL segment archiving.
+     *
+     * @throws Exception if failure occurs.
+     */
+    @Test
+    public void testForceArchiveSegment() throws Exception {
+        AtomicBoolean waitingForEvt = new AtomicBoolean();
+
+        CountDownLatch forceArchiveSegment = new CountDownLatch(1);
+
+        forceArchiveSegmentMs = 1000;
+
+        Ignite ignite = startGrid();
+
+        ignite.cluster().state(ACTIVE);
+
+        IgniteEvents evts = ignite.events();
+
+        evts.localListen(e -> {
+            if (waitingForEvt.get())
+                forceArchiveSegment.countDown();
+
+            return true;
+        }, EVT_WAL_SEGMENT_ARCHIVED);
+
+        putDummyRecords(ignite, 100);
+
+        waitingForEvt.set(true); // Flag for skipping regular log() and rollOver().
+
+        boolean recordedAfterSleep = forceArchiveSegment.await(forceArchiveSegmentMs + 1001, TimeUnit.MILLISECONDS);

Review comment:
       the flaky test detected.
   there is no warranty that this will happen in 1001 ms.
   the system may have limited resources and cause delay.
   also, rollover may happen between`putDummyRecords(ignite, 100);` and `waitingForEvt.set(true)`

##########
File path: modules/core/src/test/java/org/apache/ignite/testframework/junits/logger/GridTestLog4jLogger.java
##########
@@ -102,9 +102,9 @@
     /** Quiet flag. */
     private final boolean quiet;
 
-    /** Node ID. */
+    /** Postfix. */
     @GridToStringExclude
-    private UUID nodeId;
+    private String postfix;

Review comment:
       has no usages

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorDataStorageConfiguration.java
##########
@@ -425,6 +436,7 @@ public boolean isWalCompactionEnabled() {
         out.writeInt(metricsSubIntervalCount);
         out.writeLong(metricsRateTimeInterval);
         out.writeLong(walAutoArchiveAfterInactivity);
+        out.writeLong(walForceArchiveTimeout);

Review comment:
       will this break compatibility?
   since this is an experimental flag, will this break it again on refactoring?
   
   anyway, we MUST write/read this after all others to solve compatibility issues.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorDataStorageConfiguration.java
##########
@@ -116,9 +116,12 @@
     /** Time interval (in milliseconds) for rate-based metrics. */
     private long metricsRateTimeInterval;
 
-    /** Time interval (in milliseconds) for running auto archiving for incompletely WAL segment */
+    /** Time interval (in milliseconds) for running auto archiving for incompletely WAL segment. */
     private long walAutoArchiveAfterInactivity;
 
+    /** Time interval (in milliseconds) for running auto archiving for incompletely WAL segment. */

Review comment:
       duplicated descritpion

##########
File path: modules/dev-utils/src/main/java/org/apache/ignite/development/utils/WalStat.java
##########
@@ -127,7 +127,7 @@ void registerRecord(WALRecord record, WALPointer walPointer, boolean workDir) {
 
         if (type == WALRecord.RecordType.PAGE_RECORD)
             registerPageSnapshot((PageSnapshot)record);
-        else if (type == WALRecord.RecordType.DATA_RECORD || type == WALRecord.RecordType.MVCC_DATA_RECORD)
+        else if (type == WALRecord.RecordType.DATA_RECORD || type == WALRecord.RecordType.DATA_RECORD_V2 || type == WALRecord.RecordType.MVCC_DATA_RECORD)

Review comment:
       newline required

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
##########
@@ -2622,8 +2636,12 @@ private IgniteLogger initLogger(@Nullable IgniteLogger cfgLog, UUID nodeId, Stri
                     ((JavaLogger)cfgLog).setWorkDirectory(workDir);
 
                 // Set node IDs for all file appenders.
-                if (cfgLog instanceof LoggerNodeIdAware)
-                    ((LoggerNodeIdAware)cfgLog).setNodeId(nodeId);
+                if (cfgLog instanceof LoggerNodeIdAware) {
+                    if (nodeId == null && cfgLog instanceof LoggerPostfixAware)
+                        ((LoggerPostfixAware)cfgLog).setPostfix(postfix);

Review comment:
       postfix is always null

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
##########
@@ -2557,8 +2558,21 @@ private void initializeDefaultSpi(IgniteConfiguration cfg) {
          * @return Initialized logger.
          * @throws IgniteCheckedException If failed.
          */
-        @SuppressWarnings("ErrorNotRethrown")
         private IgniteLogger initLogger(@Nullable IgniteLogger cfgLog, UUID nodeId, String workDir)
+            throws IgniteCheckedException {
+            return initLogger(cfgLog, nodeId, null, workDir);

Review comment:
       this method has single usage, always with postfix == null.

##########
File path: modules/core/src/main/java/org/apache/ignite/logger/java/JavaLogger.java
##########
@@ -368,14 +372,27 @@ public void setWorkDirectory(String workDir) {
     @Override public void setNodeId(UUID nodeId) {
         A.notNull(nodeId, "nodeId");
 
-        if (this.nodeId != null)
+        postfix(nodeId, U.id8(nodeId));
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setPostfix(String postfix) {

Review comment:
       always used to set node it. 
   any reason to have this method, field, class, etc?

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
##########
@@ -748,21 +758,30 @@ private void checkWalConfiguration() throws IgniteCheckedException {
         if (mode == WALMode.BACKGROUND)
             backgroundFlushSchedule = cctx.time().schedule(this::doFlush, flushFreq, flushFreq);
 
-        if (walAutoArchiveAfterInactivity > 0)
-            scheduleNextInactivityPeriodElapsedCheck();
+        if (walAutoArchiveAfterInactivity > 0 || walForceArchiveTimeout > 0)
+            scheduleNextRolloverCheck();
     }
 
     /**
-     * Schedules next check of inactivity period expired. Based on current record update timestamp. At timeout method
-     * does check of inactivity period and schedules new launch.
+     * Schedules next rollover check.
+     * If {@link DataStorageConfiguration#getWalForceArchiveTimeout()} configured rollover happens forcefully.
+     * Else check based on current record update timestamp and at timeout method does check of inactivity period and schedules new launch.
      */
-    private void scheduleNextInactivityPeriodElapsedCheck() {
-        assert walAutoArchiveAfterInactivity > 0;
+    private void scheduleNextRolloverCheck() {
+        assert walAutoArchiveAfterInactivity > 0 || walForceArchiveTimeout > 0;
         assert timeoutRolloverMux != null;
 
         synchronized (timeoutRolloverMux) {
-            long lastRecMs = lastRecordLoggedMs.get();
-            long nextEndTime = lastRecMs <= 0 ? U.currentTimeMillis() : lastRecMs + walAutoArchiveAfterInactivity;
+            long nextEndTime;
+
+            if (walForceArchiveTimeout > 0) {
+                long lastRollover = lastRolloverMs.get();
+                nextEndTime = lastRollover == 0 ? U.currentTimeMillis() : lastRollover + walForceArchiveTimeout;
+            }
+            else {
+                long lastRecMs = lastRecordLoggedMs.get();
+                nextEndTime = lastRecMs <= 0 ? U.currentTimeMillis() : lastRecMs + walAutoArchiveAfterInactivity;
+            }

Review comment:
       looks like duplicated code, could we simplify this?

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java
##########
@@ -2199,6 +2200,7 @@ private static void writeDataStorageConfiguration(BinaryRawWriter w, DataStorage
             w.writeInt(cfg.getPageSize());
             w.writeInt(cfg.getConcurrencyLevel());
             w.writeLong(cfg.getWalAutoArchiveAfterInactivity());
+            w.writeLong(cfg.getWalForceArchiveTimeout());

Review comment:
       MUST this write be the latest to keep compatibility?

##########
File path: modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryTest.java
##########
@@ -785,7 +785,7 @@ public void testWalRenameDirSimple() throws Exception {
      */
     private File cacheDir(final String cacheName, final String consId) throws IgniteCheckedException {
         final String subfolderName
-            = PdsConsistentIdProcessor.genNewStyleSubfolderName(0, UUID.fromString(consId));
+            = genNewStyleSubfolderName(0, UUID.fromString(consId));

Review comment:
       no reason for newline

##########
File path: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/IgniteWalReaderTest.java
##########
@@ -996,6 +1056,189 @@ private void runRemoveOperationTest(CacheAtomicityMode mode) throws Exception {
             deletesFound != null && deletesFound > 0);
     }
 
+    /**
+     * Tests transaction generation and WAL for putAll cache operation.
+     *
+     * @throws Exception if failed.
+     */
+    @Test
+    public void testPrimaryFlagOnTwoNodes() throws Exception {

Review comment:
       we still need test streaming on unstable topology with the assertion that streamed entries cnt == cnt recorded as primary.
   single and multi-key tx should be checked as well as atomics.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org