You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ni...@apache.org on 2022/12/19 11:36:37 UTC

[ignite] branch master updated: IGNITE-18404 Stop CDC consumer in shutdown hook (#10450)

This is an automated email from the ASF dual-hosted git repository.

nizhikov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 232df17802b IGNITE-18404 Stop CDC consumer in shutdown hook (#10450)
232df17802b is described below

commit 232df17802b6bcac66552f3fbecb7ee34e2130be
Author: Nikolay <ni...@apache.org>
AuthorDate: Mon Dec 19 14:36:28 2022 +0300

    IGNITE-18404 Stop CDC consumer in shutdown hook (#10450)
---
 .../java/org/apache/ignite/internal/cdc/CdcMain.java   | 18 ++++++++++++++----
 .../ignite/startup/cmdline/CdcCommandLineStartup.java  |  4 ----
 .../apache/ignite/cdc/CdcNonDefaultWorkDirTest.java    |  2 +-
 3 files changed, 15 insertions(+), 9 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java
index 4307c326c86..e16e1b409f3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java
@@ -219,6 +219,9 @@ public class CdcMain implements Runnable {
     /** Caches state. */
     private Map<Integer, Long> cachesState;
 
+    /** Stopped flag. */
+    private volatile boolean started;
+
     /** Stopped flag. */
     private volatile boolean stopped;
 
@@ -314,19 +317,21 @@ public class CdcMain implements Runnable {
 
                 consumer.start(mreg, kctx.metric().registry(metricName("cdc", "consumer")));
 
+                started = true;
+
                 try {
                     consumeWalSegmentsUntilStopped();
                 }
                 finally {
-                    consumer.stop();
-
-                    if (log.isInfoEnabled())
-                        log.info("Ignite Change Data Capture Application stopped.");
+                    stop();
                 }
             }
             finally {
                 for (GridComponent comp : kctx)
                     comp.stop(false);
+
+                if (log.isInfoEnabled())
+                    log.info("Ignite Change Data Capture Application stopped.");
             }
         }
     }
@@ -811,10 +816,15 @@ public class CdcMain implements Runnable {
     /** Stops the application. */
     public void stop() {
         synchronized (this) {
+            if (stopped || !started)
+                return;
+
             if (log.isInfoEnabled())
                 log.info("Stopping Change Data Capture service instance");
 
             stopped = true;
+
+            consumer.stop();
         }
     }
 
diff --git a/modules/core/src/main/java/org/apache/ignite/startup/cmdline/CdcCommandLineStartup.java b/modules/core/src/main/java/org/apache/ignite/startup/cmdline/CdcCommandLineStartup.java
index e7f82ba1951..c390f95338f 100644
--- a/modules/core/src/main/java/org/apache/ignite/startup/cmdline/CdcCommandLineStartup.java
+++ b/modules/core/src/main/java/org/apache/ignite/startup/cmdline/CdcCommandLineStartup.java
@@ -94,10 +94,6 @@ public class CdcCommandLineStartup {
 
             appThread.join();
         }
-        catch (InterruptedException e) {
-            if (cdc.get() != null)
-                cdc.get().stop();
-        }
         catch (Throwable e) {
             e.printStackTrace();
 
diff --git a/modules/core/src/test/java/org/apache/ignite/cdc/CdcNonDefaultWorkDirTest.java b/modules/core/src/test/java/org/apache/ignite/cdc/CdcNonDefaultWorkDirTest.java
index b0fd392f4fa..7d05a8e549e 100644
--- a/modules/core/src/test/java/org/apache/ignite/cdc/CdcNonDefaultWorkDirTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/cdc/CdcNonDefaultWorkDirTest.java
@@ -65,7 +65,7 @@ public class CdcNonDefaultWorkDirTest extends GridCommonAbstractTest {
     @Override protected void afterTest() throws Exception {
         U.delete(new File(DFLT_WORK_DIR));
         U.nullifyHomeDirectory();
-        U.getIgniteHome();
+        U.setIgniteHome(DFLT_WORK_DIR);
     }
 
     /** Tests CDC start with non default work directory. */