You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2021/04/27 15:06:07 UTC

[GitHub] [ignite] nizhikov opened a new pull request #9054: [Don't merge] IGNITE-CDC

nizhikov opened a new pull request #9054:
URL: https://github.com/apache/ignite/pull/9054


   This PR introduces CDC application for Ignite.
   
   
   ### The Contribution Checklist
   - [ ] There is a single JIRA ticket related to the pull request. 
   - [ ] The web-link to the pull request is attached to the JIRA ticket.
   - [ ] The JIRA ticket has the _Patch Available_ state.
   - [ ] The pull request body describes changes that have been made. 
   The description explains _WHAT_ and _WHY_ was made instead of _HOW_.
   - [ ] The pull request title is treated as the final commit message. 
   The following pattern must be used: `IGNITE-XXXX Change summary` where `XXXX` - number of JIRA issue.
   - [ ] A reviewer has been mentioned through the JIRA comments 
   (see [the Maintainers list](https://cwiki.apache.org/confluence/display/IGNITE/How+to+Contribute#HowtoContribute-ReviewProcessandMaintainers)) 
   - [ ] The pull request has been checked by the Teamcity Bot and 
   the `green visa` attached to the JIRA ticket (see [TC.Bot: Check PR](https://mtcga.gridgain.com/prs.html))
   
   ### Notes
   - [How to Contribute](https://cwiki.apache.org/confluence/display/IGNITE/How+to+Contribute)
   - [Coding abbreviation rules](https://cwiki.apache.org/confluence/display/IGNITE/Abbreviation+Rules)
   - [Coding Guidelines](https://cwiki.apache.org/confluence/display/IGNITE/Coding+Guidelines)
   - [Apache Ignite Teamcity Bot](https://cwiki.apache.org/confluence/display/IGNITE/Apache+Ignite+Teamcity+Bot)
   
   If you need any help, please email dev@ignite.apache.org or ask anу advice on http://asf.slack.com _#ignite_ channel.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] nizhikov commented on a change in pull request #9054: [WIP] IGNITE-CDC

Posted by GitBox <gi...@apache.org>.
nizhikov commented on a change in pull request #9054:
URL: https://github.com/apache/ignite/pull/9054#discussion_r632981564



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java
##########
@@ -2199,6 +2200,7 @@ private static void writeDataStorageConfiguration(BinaryRawWriter w, DataStorage
             w.writeInt(cfg.getPageSize());
             w.writeInt(cfg.getConcurrencyLevel());
             w.writeLong(cfg.getWalAutoArchiveAfterInactivity());
+            w.writeLong(cfg.getWalForceArchiveTimeout());

Review comment:
       This responsible for "platform" interaction, e.g. always have the same .Net and Ignite versions.
   No need to provide compatibility here.

##########
File path: modules/platforms/dotnet/Apache.Ignite.Core/Configuration/DataStorageConfiguration.cs
##########
@@ -241,6 +242,7 @@ internal DataStorageConfiguration(IBinaryRawReader reader)
             PageSize = reader.ReadInt();
             ConcurrencyLevel = reader.ReadInt();
             WalAutoArchiveAfterInactivity = reader.ReadLongAsTimespan();
+            WalForceArchiveTimeout = reader.ReadLongAsTimespan();

Review comment:
       This responsible for "platform" interaction, e.g. always have the same .Net and Ignite versions.
   No need to provide compatibility here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] anton-vinogradov commented on a change in pull request #9054: [WIP] IGNITE-CDC

Posted by GitBox <gi...@apache.org>.
anton-vinogradov commented on a change in pull request #9054:
URL: https://github.com/apache/ignite/pull/9054#discussion_r632461046



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
##########
@@ -3453,9 +3489,9 @@ private TimeoutRollover(long endTime) {
                             new Time(U.currentTimeMillis()).toString() + ")");
                     }
 
-                    checkWalRolloverRequiredDuringInactivityPeriod();
+                    checkWalRolloverRequired();

Review comment:
       how about `checkWalFlushAndRolloverRequired()` ?

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
##########
@@ -774,25 +793,38 @@ private void scheduleNextInactivityPeriodElapsedCheck() {
     }
 
     /**
-     * Checks if there was elapsed significant period of inactivity. If WAL auto-archive is enabled using
-     * {@link #walAutoArchiveAfterInactivity} > 0 this method will activate roll over by timeout.<br>
+     * Checks if there was elapsed significant period of inactivity or force archive timeout.
+     * If WAL auto-archive is enabled using {@link #walAutoArchiveAfterInactivity} > 0 or {@link #walForceArchiveTimeout}
+     * this method will activate roll over by timeout.
      */
-    private void checkWalRolloverRequiredDuringInactivityPeriod() {
-        if (walAutoArchiveAfterInactivity <= 0)
-            return; // feature not configured, nothing to do
+    private void checkWalRolloverRequired() {
+        if (walAutoArchiveAfterInactivity <= 0 && walForceArchiveTimeout <= 0)
+            return; // feature not configured, nothing to do.
 
         final long lastRecMs = lastRecordLoggedMs.get();
 
         if (lastRecMs == 0)
-            return; //no records were logged to current segment, does not consider inactivity
+            return; //no records were logged to current segment, does not consider inactivity.
 
-        final long elapsedMs = U.currentTimeMillis() - lastRecMs;
+        if (walForceArchiveTimeout > 0) {
+            final long lastRollover = lastRolloverMs.get();
+            final long elapsedMs = U.currentTimeMillis() - lastRollover;
 
-        if (elapsedMs <= walAutoArchiveAfterInactivity)
-            return; // not enough time elapsed since last write
+            if (elapsedMs < walForceArchiveTimeout)
+                return; // not enough time elapsed since last rollover.
 
-        if (!lastRecordLoggedMs.compareAndSet(lastRecMs, 0))
-            return; // record write occurred concurrently
+            if (!lastRolloverMs.compareAndSet(lastRollover, 0))
+                return; // record write occurred concurrently.
+        }
+        else {
+            final long elapsedMs = U.currentTimeMillis() - lastRecMs;
+
+            if (elapsedMs <= walAutoArchiveAfterInactivity)
+                return; // not enough time elapsed since last write.
+
+            if (!lastRecordLoggedMs.compareAndSet(lastRecMs, 0))
+                return; // record write occurred concurrently.
+        }

Review comment:
       looks like duplicated code, could we simplify this?

##########
File path: modules/platforms/dotnet/Apache.Ignite.Core/Configuration/DataStorageConfiguration.cs
##########
@@ -241,6 +242,7 @@ internal DataStorageConfiguration(IBinaryRawReader reader)
             PageSize = reader.ReadInt();
             ConcurrencyLevel = reader.ReadInt();
             WalAutoArchiveAfterInactivity = reader.ReadLongAsTimespan();
+            WalForceArchiveTimeout = reader.ReadLongAsTimespan();

Review comment:
       Must be relocated as latest read.

##########
File path: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/IgniteWalReaderTest.java
##########
@@ -342,6 +365,43 @@ private boolean checkWhetherWALRelatedEventFired(int evtType) throws Exception {
         return evtRecorded.get();
     }
 
+    /**
+     * Tests force time out based WAL segment archiving.
+     *
+     * @throws Exception if failure occurs.
+     */
+    @Test
+    public void testForceArchiveSegment() throws Exception {
+        AtomicBoolean waitingForEvt = new AtomicBoolean();
+
+        CountDownLatch forceArchiveSegment = new CountDownLatch(1);
+
+        forceArchiveSegmentMs = 1000;
+
+        Ignite ignite = startGrid();
+
+        ignite.cluster().state(ACTIVE);
+
+        IgniteEvents evts = ignite.events();
+
+        evts.localListen(e -> {
+            if (waitingForEvt.get())
+                forceArchiveSegment.countDown();
+
+            return true;
+        }, EVT_WAL_SEGMENT_ARCHIVED);
+
+        putDummyRecords(ignite, 100);
+
+        waitingForEvt.set(true); // Flag for skipping regular log() and rollOver().
+
+        boolean recordedAfterSleep = forceArchiveSegment.await(forceArchiveSegmentMs + 1001, TimeUnit.MILLISECONDS);

Review comment:
       the flaky test detected.
   there is no warranty that this will happen in 1001 ms.
   the system may have limited resources and cause delay.
   also, rollover may happen between`putDummyRecords(ignite, 100);` and `waitingForEvt.set(true)`

##########
File path: modules/core/src/test/java/org/apache/ignite/testframework/junits/logger/GridTestLog4jLogger.java
##########
@@ -102,9 +102,9 @@
     /** Quiet flag. */
     private final boolean quiet;
 
-    /** Node ID. */
+    /** Postfix. */
     @GridToStringExclude
-    private UUID nodeId;
+    private String postfix;

Review comment:
       has no usages

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorDataStorageConfiguration.java
##########
@@ -425,6 +436,7 @@ public boolean isWalCompactionEnabled() {
         out.writeInt(metricsSubIntervalCount);
         out.writeLong(metricsRateTimeInterval);
         out.writeLong(walAutoArchiveAfterInactivity);
+        out.writeLong(walForceArchiveTimeout);

Review comment:
       will this break compatibility?
   since this is an experimental flag, will this break it again on refactoring?
   
   anyway, we MUST write/read this after all others to solve compatibility issues.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorDataStorageConfiguration.java
##########
@@ -116,9 +116,12 @@
     /** Time interval (in milliseconds) for rate-based metrics. */
     private long metricsRateTimeInterval;
 
-    /** Time interval (in milliseconds) for running auto archiving for incompletely WAL segment */
+    /** Time interval (in milliseconds) for running auto archiving for incompletely WAL segment. */
     private long walAutoArchiveAfterInactivity;
 
+    /** Time interval (in milliseconds) for running auto archiving for incompletely WAL segment. */

Review comment:
       duplicated descritpion

##########
File path: modules/dev-utils/src/main/java/org/apache/ignite/development/utils/WalStat.java
##########
@@ -127,7 +127,7 @@ void registerRecord(WALRecord record, WALPointer walPointer, boolean workDir) {
 
         if (type == WALRecord.RecordType.PAGE_RECORD)
             registerPageSnapshot((PageSnapshot)record);
-        else if (type == WALRecord.RecordType.DATA_RECORD || type == WALRecord.RecordType.MVCC_DATA_RECORD)
+        else if (type == WALRecord.RecordType.DATA_RECORD || type == WALRecord.RecordType.DATA_RECORD_V2 || type == WALRecord.RecordType.MVCC_DATA_RECORD)

Review comment:
       newline required

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
##########
@@ -2622,8 +2636,12 @@ private IgniteLogger initLogger(@Nullable IgniteLogger cfgLog, UUID nodeId, Stri
                     ((JavaLogger)cfgLog).setWorkDirectory(workDir);
 
                 // Set node IDs for all file appenders.
-                if (cfgLog instanceof LoggerNodeIdAware)
-                    ((LoggerNodeIdAware)cfgLog).setNodeId(nodeId);
+                if (cfgLog instanceof LoggerNodeIdAware) {
+                    if (nodeId == null && cfgLog instanceof LoggerPostfixAware)
+                        ((LoggerPostfixAware)cfgLog).setPostfix(postfix);

Review comment:
       postfix is always null

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
##########
@@ -2557,8 +2558,21 @@ private void initializeDefaultSpi(IgniteConfiguration cfg) {
          * @return Initialized logger.
          * @throws IgniteCheckedException If failed.
          */
-        @SuppressWarnings("ErrorNotRethrown")
         private IgniteLogger initLogger(@Nullable IgniteLogger cfgLog, UUID nodeId, String workDir)
+            throws IgniteCheckedException {
+            return initLogger(cfgLog, nodeId, null, workDir);

Review comment:
       this method has single usage, always with postfix == null.

##########
File path: modules/core/src/main/java/org/apache/ignite/logger/java/JavaLogger.java
##########
@@ -368,14 +372,27 @@ public void setWorkDirectory(String workDir) {
     @Override public void setNodeId(UUID nodeId) {
         A.notNull(nodeId, "nodeId");
 
-        if (this.nodeId != null)
+        postfix(nodeId, U.id8(nodeId));
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setPostfix(String postfix) {

Review comment:
       always used to set node it. 
   any reason to have this method, field, class, etc?

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
##########
@@ -748,21 +758,30 @@ private void checkWalConfiguration() throws IgniteCheckedException {
         if (mode == WALMode.BACKGROUND)
             backgroundFlushSchedule = cctx.time().schedule(this::doFlush, flushFreq, flushFreq);
 
-        if (walAutoArchiveAfterInactivity > 0)
-            scheduleNextInactivityPeriodElapsedCheck();
+        if (walAutoArchiveAfterInactivity > 0 || walForceArchiveTimeout > 0)
+            scheduleNextRolloverCheck();
     }
 
     /**
-     * Schedules next check of inactivity period expired. Based on current record update timestamp. At timeout method
-     * does check of inactivity period and schedules new launch.
+     * Schedules next rollover check.
+     * If {@link DataStorageConfiguration#getWalForceArchiveTimeout()} configured rollover happens forcefully.
+     * Else check based on current record update timestamp and at timeout method does check of inactivity period and schedules new launch.
      */
-    private void scheduleNextInactivityPeriodElapsedCheck() {
-        assert walAutoArchiveAfterInactivity > 0;
+    private void scheduleNextRolloverCheck() {
+        assert walAutoArchiveAfterInactivity > 0 || walForceArchiveTimeout > 0;
         assert timeoutRolloverMux != null;
 
         synchronized (timeoutRolloverMux) {
-            long lastRecMs = lastRecordLoggedMs.get();
-            long nextEndTime = lastRecMs <= 0 ? U.currentTimeMillis() : lastRecMs + walAutoArchiveAfterInactivity;
+            long nextEndTime;
+
+            if (walForceArchiveTimeout > 0) {
+                long lastRollover = lastRolloverMs.get();
+                nextEndTime = lastRollover == 0 ? U.currentTimeMillis() : lastRollover + walForceArchiveTimeout;
+            }
+            else {
+                long lastRecMs = lastRecordLoggedMs.get();
+                nextEndTime = lastRecMs <= 0 ? U.currentTimeMillis() : lastRecMs + walAutoArchiveAfterInactivity;
+            }

Review comment:
       looks like duplicated code, could we simplify this?

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java
##########
@@ -2199,6 +2200,7 @@ private static void writeDataStorageConfiguration(BinaryRawWriter w, DataStorage
             w.writeInt(cfg.getPageSize());
             w.writeInt(cfg.getConcurrencyLevel());
             w.writeLong(cfg.getWalAutoArchiveAfterInactivity());
+            w.writeLong(cfg.getWalForceArchiveTimeout());

Review comment:
       MUST this write be the latest to keep compatibility?

##########
File path: modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryTest.java
##########
@@ -785,7 +785,7 @@ public void testWalRenameDirSimple() throws Exception {
      */
     private File cacheDir(final String cacheName, final String consId) throws IgniteCheckedException {
         final String subfolderName
-            = PdsConsistentIdProcessor.genNewStyleSubfolderName(0, UUID.fromString(consId));
+            = genNewStyleSubfolderName(0, UUID.fromString(consId));

Review comment:
       no reason for newline

##########
File path: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/IgniteWalReaderTest.java
##########
@@ -996,6 +1056,189 @@ private void runRemoveOperationTest(CacheAtomicityMode mode) throws Exception {
             deletesFound != null && deletesFound > 0);
     }
 
+    /**
+     * Tests transaction generation and WAL for putAll cache operation.
+     *
+     * @throws Exception if failed.
+     */
+    @Test
+    public void testPrimaryFlagOnTwoNodes() throws Exception {

Review comment:
       we still need test streaming on unstable topology with the assertion that streamed entries cnt == cnt recorded as primary.
   single and multi-key tx should be checked as well as atomics.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] nizhikov merged pull request #9054: IGNITE-13581 Change Data Capture implementation

Posted by GitBox <gi...@apache.org>.
nizhikov merged pull request #9054:
URL: https://github.com/apache/ignite/pull/9054


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] anton-vinogradov commented on a change in pull request #9054: [WIP] IGNITE-CDC

Posted by GitBox <gi...@apache.org>.
anton-vinogradov commented on a change in pull request #9054:
URL: https://github.com/apache/ignite/pull/9054#discussion_r632480162



##########
File path: modules/core/src/main/java/org/apache/ignite/logger/java/JavaLogger.java
##########
@@ -368,14 +372,27 @@ public void setWorkDirectory(String workDir) {
     @Override public void setNodeId(UUID nodeId) {
         A.notNull(nodeId, "nodeId");
 
-        if (this.nodeId != null)
+        postfix(nodeId, U.id8(nodeId));
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setPostfix(String postfix) {

Review comment:
       always used to set node id. 
   any reason to have this method, field, class, etc?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] nizhikov commented on a change in pull request #9054: [WIP] IGNITE-CDC

Posted by GitBox <gi...@apache.org>.
nizhikov commented on a change in pull request #9054:
URL: https://github.com/apache/ignite/pull/9054#discussion_r632981564



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java
##########
@@ -2199,6 +2200,7 @@ private static void writeDataStorageConfiguration(BinaryRawWriter w, DataStorage
             w.writeInt(cfg.getPageSize());
             w.writeInt(cfg.getConcurrencyLevel());
             w.writeLong(cfg.getWalAutoArchiveAfterInactivity());
+            w.writeLong(cfg.getWalForceArchiveTimeout());

Review comment:
       This is responsible for "platform" interaction, e.g. always have the same .Net and Ignite versions.
   No need to provide compatibility here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] nizhikov commented on a change in pull request #9054: [WIP] IGNITE-CDC

Posted by GitBox <gi...@apache.org>.
nizhikov commented on a change in pull request #9054:
URL: https://github.com/apache/ignite/pull/9054#discussion_r632981311



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorDataStorageConfiguration.java
##########
@@ -425,6 +436,7 @@ public boolean isWalCompactionEnabled() {
         out.writeInt(metricsSubIntervalCount);
         out.writeLong(metricsRateTimeInterval);
         out.writeLong(walAutoArchiveAfterInactivity);
+        out.writeLong(walForceArchiveTimeout);

Review comment:
       Good catch, thanks.
   
   I don't know if anyone is still using this piece of code, but there is a compatibility implementation here.
   Fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org