You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2021/09/09 17:18:35 UTC

[GitHub] [ignite] nizhikov opened a new pull request #9398: IGNITE-14355 CDC Metrics

nizhikov opened a new pull request #9398:
URL: https://github.com/apache/ignite/pull/9398


   Thank you for submitting the pull request to the Apache Ignite.
   
   In order to streamline the review of the contribution 
   we ask you to ensure the following steps have been taken:
   
   ### The Contribution Checklist
   - [ ] There is a single JIRA ticket related to the pull request. 
   - [ ] The web-link to the pull request is attached to the JIRA ticket.
   - [ ] The JIRA ticket has the _Patch Available_ state.
   - [ ] The pull request body describes changes that have been made. 
   The description explains _WHAT_ and _WHY_ was made instead of _HOW_.
   - [ ] The pull request title is treated as the final commit message. 
   The following pattern must be used: `IGNITE-XXXX Change summary` where `XXXX` - number of JIRA issue.
   - [ ] A reviewer has been mentioned through the JIRA comments 
   (see [the Maintainers list](https://cwiki.apache.org/confluence/display/IGNITE/How+to+Contribute#HowtoContribute-ReviewProcessandMaintainers)) 
   - [ ] The pull request has been checked by the Teamcity Bot and 
   the `green visa` attached to the JIRA ticket (see [TC.Bot: Check PR](https://mtcga.gridgain.com/prs.html))
   
   ### Notes
   - [How to Contribute](https://cwiki.apache.org/confluence/display/IGNITE/How+to+Contribute)
   - [Coding abbreviation rules](https://cwiki.apache.org/confluence/display/IGNITE/Abbreviation+Rules)
   - [Coding Guidelines](https://cwiki.apache.org/confluence/display/IGNITE/Coding+Guidelines)
   - [Apache Ignite Teamcity Bot](https://cwiki.apache.org/confluence/display/IGNITE/Apache+Ignite+Teamcity+Bot)
   
   If you need any help, please email dev@ignite.apache.org or ask anу advice on http://asf.slack.com _#ignite_ channel.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] nizhikov merged pull request #9398: IGNITE-14355 CDC Metrics

Posted by GitBox <gi...@apache.org>.
nizhikov merged pull request #9398:
URL: https://github.com/apache/ignite/pull/9398


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] nizhikov commented on a change in pull request #9398: IGNITE-14355 CDC Metrics

Posted by GitBox <gi...@apache.org>.
nizhikov commented on a change in pull request #9398:
URL: https://github.com/apache/ignite/pull/9398#discussion_r716767461



##########
File path: modules/core/src/test/java/org/apache/ignite/cdc/AbstractCdcTest.java
##########
@@ -96,29 +183,87 @@ public void addAndWaitForConsumption(
     }
 
     /** */
-    public boolean waitForSize(
+    public void waitForSize(
         int expSz,
         String cacheName,
         CdcSelfTest.ChangeEventType evtType,
-        long timeout,
         TestCdcConsumer<?>... cnsmrs
     ) throws IgniteInterruptedCheckedException {
-        return waitForCondition(
-            () -> {
-                int sum = Arrays.stream(cnsmrs).mapToInt(c -> F.size(c.data(evtType, cacheId(cacheName)))).sum();
-                return sum == expSz;
-            },
-            timeout);
+        assertTrue(waitForCondition(sizePredicate(expSz, cacheName, evtType, cnsmrs), getTestTimeout()));
     }
 
     /** */
-    public CdcConfiguration cdcConfig(CdcConsumer cnsmr) {
-        CdcConfiguration cdcCfg = new CdcConfiguration();
+    protected GridAbsPredicate sizePredicate(
+        int expSz,
+        String cacheName,
+        ChangeEventType evtType,
+        TestCdcConsumer<?>... cnsmrs
+    ) {
+        return () -> {
+            int sum = Arrays.stream(cnsmrs).mapToInt(c -> F.size(c.data(evtType, cacheId(cacheName)))).sum();
+            return sum == expSz;
+        };
+    }
 
-        cdcCfg.setConsumer(cnsmr);
-        cdcCfg.setKeepBinary(false);
+    /** */
+    protected long checkMetrics(CdcMain cdc, int expCnt) throws Exception {
+        if (metricExporters() != null) {
+            IgniteConfiguration cfg = getFieldValue(cdc, "igniteCfg");
+
+            DynamicMBean jmxCdcReg = metricRegistry(cdcInstanceName(cfg.getIgniteInstanceName()), null, "cdc");
+
+            Function<String, ?> jmxVal = m -> {
+                try {
+                    return jmxCdcReg.getAttribute(m);
+                }
+                catch (Exception e) {
+                    throw new IgniteException(e);
+                }
+            };
+
+            checkMetrics(expCnt, (Function<String, Long>)jmxVal, (Function<String, String>)jmxVal);
+        }
+
+        MetricRegistry mreg = getFieldValue(cdc, "mreg");
+
+        assertNotNull(mreg);
+
+        return checkMetrics(
+            expCnt,
+            m -> mreg.<LongMetric>findMetric(m).value(),
+            m -> mreg.<ObjectMetric<String>>findMetric(m).value()
+        );
+    }
+
+    /** */
+    private long checkMetrics(long expCnt, Function<String, Long> longMetric, Function<String, String> strMetric) {
+        long committedSegIdx = longMetric.apply(COMMITTED_SEG_IDX);
+        long curSegIdx = longMetric.apply(CUR_SEG_IDX);
+
+        assertTrue(committedSegIdx <= curSegIdx);
 
-        return cdcCfg;
+        assertTrue(longMetric.apply(COMMITTED_SEG_OFFSET) >= 0);
+        assertTrue(longMetric.apply(LAST_SEG_CONSUMPTION_TIME) > 0);
+
+        assertTrue(longMetric.apply(LAST_EVT_TIME) > 0);
+
+        for (String m : new String[] {BINARY_META_DIR, MARSHALLER_DIR, CDC_DIR})
+            assertTrue(new File(strMetric.apply(m)).exists());
+
+        if (expCnt != -1)

Review comment:
       fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] nizhikov merged pull request #9398: IGNITE-14355 CDC Metrics

Posted by GitBox <gi...@apache.org>.
nizhikov merged pull request #9398:
URL: https://github.com/apache/ignite/pull/9398


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] anton-vinogradov commented on a change in pull request #9398: IGNITE-14355 CDC Metrics

Posted by GitBox <gi...@apache.org>.
anton-vinogradov commented on a change in pull request #9398:
URL: https://github.com/apache/ignite/pull/9398#discussion_r716498434



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java
##########
@@ -244,30 +260,120 @@ public void runX() throws Exception {
                 log.info("Ignite node Marshaller [dir=" + marshaller + ']');
             }
 
-            injectResources(consumer.consumer());
+            StandaloneGridKernalContext kctx = startStandaloneKernal();
+
+            initMetrics();
 
-            state = new CdcConsumerState(cdcDir.resolve(STATE_DIR));
+            try {
+                kctx.resource().injectGeneric(consumer.consumer());
 
-            initState = state.load();
+                state = createState(cdcDir.resolve(STATE_DIR));
 
-            if (initState != null && log.isInfoEnabled())
-                log.info("Initial state loaded [state=" + initState + ']');
+                initState = state.load();
 
-            consumer.start();
+                if (initState != null) {
+                    committedSegmentIdx.value(initState.index());
+                    committedSegmentOffset.value(initState.fileOffset());
 
-            try {
-                consumeWalSegmentsUntilStopped();
+                    if (log.isInfoEnabled())
+                        log.info("Initial state loaded [state=" + initState + ']');
+                }
+
+                consumer.start(mreg, kctx.metric().registry(metricName("cdc", "consumer")));
+
+                try {
+                    consumeWalSegmentsUntilStopped();
+                }
+                finally {
+                    consumer.stop();
+
+                    if (log.isInfoEnabled())
+                        log.info("Ignite Change Data Capture Application stopped.");
+                }
             }
             finally {
-                consumer.stop();
+                for (GridComponent comp : kctx)
+                    comp.stop(false);
+            }
+        }
+    }
+
+    /** Creates consumer state. */
+    protected CdcConsumerState createState(Path stateDir) {
+        return new CdcConsumerState(stateDir);
+    }

Review comment:
       Oneliner with single usage?
   Please get rid of it.

##########
File path: modules/core/src/test/java/org/apache/ignite/cdc/AbstractCdcTest.java
##########
@@ -96,29 +183,87 @@ public void addAndWaitForConsumption(
     }
 
     /** */
-    public boolean waitForSize(
+    public void waitForSize(
         int expSz,
         String cacheName,
         CdcSelfTest.ChangeEventType evtType,
-        long timeout,
         TestCdcConsumer<?>... cnsmrs
     ) throws IgniteInterruptedCheckedException {
-        return waitForCondition(
-            () -> {
-                int sum = Arrays.stream(cnsmrs).mapToInt(c -> F.size(c.data(evtType, cacheId(cacheName)))).sum();
-                return sum == expSz;
-            },
-            timeout);
+        assertTrue(waitForCondition(sizePredicate(expSz, cacheName, evtType, cnsmrs), getTestTimeout()));
     }
 
     /** */
-    public CdcConfiguration cdcConfig(CdcConsumer cnsmr) {
-        CdcConfiguration cdcCfg = new CdcConfiguration();
+    protected GridAbsPredicate sizePredicate(
+        int expSz,
+        String cacheName,
+        ChangeEventType evtType,
+        TestCdcConsumer<?>... cnsmrs
+    ) {
+        return () -> {
+            int sum = Arrays.stream(cnsmrs).mapToInt(c -> F.size(c.data(evtType, cacheId(cacheName)))).sum();
+            return sum == expSz;
+        };
+    }
 
-        cdcCfg.setConsumer(cnsmr);
-        cdcCfg.setKeepBinary(false);
+    /** */
+    protected long checkMetrics(CdcMain cdc, int expCnt) throws Exception {
+        if (metricExporters() != null) {
+            IgniteConfiguration cfg = getFieldValue(cdc, "igniteCfg");
+
+            DynamicMBean jmxCdcReg = metricRegistry(cdcInstanceName(cfg.getIgniteInstanceName()), null, "cdc");
+
+            Function<String, ?> jmxVal = m -> {
+                try {
+                    return jmxCdcReg.getAttribute(m);
+                }
+                catch (Exception e) {
+                    throw new IgniteException(e);
+                }
+            };
+
+            checkMetrics(expCnt, (Function<String, Long>)jmxVal, (Function<String, String>)jmxVal);
+        }
+
+        MetricRegistry mreg = getFieldValue(cdc, "mreg");
+
+        assertNotNull(mreg);
+
+        return checkMetrics(
+            expCnt,
+            m -> mreg.<LongMetric>findMetric(m).value(),
+            m -> mreg.<ObjectMetric<String>>findMetric(m).value()
+        );
+    }
+
+    /** */
+    private long checkMetrics(long expCnt, Function<String, Long> longMetric, Function<String, String> strMetric) {
+        long committedSegIdx = longMetric.apply(COMMITTED_SEG_IDX);
+        long curSegIdx = longMetric.apply(CUR_SEG_IDX);
+
+        assertTrue(committedSegIdx <= curSegIdx);
 
-        return cdcCfg;
+        assertTrue(longMetric.apply(COMMITTED_SEG_OFFSET) >= 0);
+        assertTrue(longMetric.apply(LAST_SEG_CONSUMPTION_TIME) > 0);
+
+        assertTrue(longMetric.apply(LAST_EVT_TIME) > 0);
+
+        for (String m : new String[] {BINARY_META_DIR, MARSHALLER_DIR, CDC_DIR})
+            assertTrue(new File(strMetric.apply(m)).exists());
+
+        if (expCnt != -1)

Review comment:
       we should always check this, see no reason for such ignor.

##########
File path: modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java
##########
@@ -246,19 +275,34 @@ public void testMultiNodeConsumption() throws Exception {
         IgniteConfiguration cfg1 = ign1.configuration();
         IgniteConfiguration cfg2 = ign2.configuration();
 
-        CdcMain cdc1 = new CdcMain(cfg1, null, cdcConfig(cnsmr1));
-        CdcMain cdc2 = new CdcMain(cfg2, null, cdcConfig(cnsmr2));
+        // Always run CDC with consistent id to ensure instance read data for specific node.
+        if (!specificConsistentId) {
+            cfg1.setConsistentId((Serializable)ign1.localNode().consistentId());
+            cfg2.setConsistentId((Serializable)ign2.localNode().consistentId());
+        }
+
+        CountDownLatch latch = new CountDownLatch(2);
+
+        GridAbsPredicate sizePredicate1 = sizePredicate(keysCnt[0], DEFAULT_CACHE_NAME, UPDATE, cnsmr1);
+        GridAbsPredicate sizePredicate2 = sizePredicate(keysCnt[1], DEFAULT_CACHE_NAME, UPDATE, cnsmr2);
+
+        CdcMain cdc1 = createCdc(cnsmr1, cfg1, latch, sizePredicate1);
+        CdcMain cdc2 = createCdc(cnsmr2, cfg2, latch, sizePredicate2);
 
         IgniteInternalFuture<?> fut1 = runAsync(cdc1);
         IgniteInternalFuture<?> fut2 = runAsync(cdc2);
 
         addDataFut.get(getTestTimeout());
 
-        addDataFut = runAsync(() -> addData(cache, KEYS_CNT, KEYS_CNT * 2));
+        runAsync(() -> addData(cache, KEYS_CNT, KEYS_CNT * 2)).get(getTestTimeout());
 
-        addDataFut.get(getTestTimeout());
+        // Wait while predicate will become true and state saved on the disk for both cdc.
+        assertTrue(latch.await(getTestTimeout(), MILLISECONDS));
 
-        assertTrue(waitForSize(KEYS_CNT * 2, DEFAULT_CACHE_NAME, UPDATE, getTestTimeout(), cnsmr1, cnsmr2));
+        long evtsCnt1 = checkMetrics(cdc1, -1);
+        long evtsCnt2 = checkMetrics(cdc2, -1);

Review comment:
       ```suggestion
           long evtsCnt1 = checkMetrics(cdc1, keysCnt[0]);
           long evtsCnt2 = checkMetrics(cdc2, keysCnt[1]);
   ```
   please fix this way other -1s




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] nizhikov commented on a change in pull request #9398: IGNITE-14355 CDC Metrics

Posted by GitBox <gi...@apache.org>.
nizhikov commented on a change in pull request #9398:
URL: https://github.com/apache/ignite/pull/9398#discussion_r716767763



##########
File path: modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java
##########
@@ -246,19 +275,34 @@ public void testMultiNodeConsumption() throws Exception {
         IgniteConfiguration cfg1 = ign1.configuration();
         IgniteConfiguration cfg2 = ign2.configuration();
 
-        CdcMain cdc1 = new CdcMain(cfg1, null, cdcConfig(cnsmr1));
-        CdcMain cdc2 = new CdcMain(cfg2, null, cdcConfig(cnsmr2));
+        // Always run CDC with consistent id to ensure instance read data for specific node.
+        if (!specificConsistentId) {
+            cfg1.setConsistentId((Serializable)ign1.localNode().consistentId());
+            cfg2.setConsistentId((Serializable)ign2.localNode().consistentId());
+        }
+
+        CountDownLatch latch = new CountDownLatch(2);
+
+        GridAbsPredicate sizePredicate1 = sizePredicate(keysCnt[0], DEFAULT_CACHE_NAME, UPDATE, cnsmr1);
+        GridAbsPredicate sizePredicate2 = sizePredicate(keysCnt[1], DEFAULT_CACHE_NAME, UPDATE, cnsmr2);
+
+        CdcMain cdc1 = createCdc(cnsmr1, cfg1, latch, sizePredicate1);
+        CdcMain cdc2 = createCdc(cnsmr2, cfg2, latch, sizePredicate2);
 
         IgniteInternalFuture<?> fut1 = runAsync(cdc1);
         IgniteInternalFuture<?> fut2 = runAsync(cdc2);
 
         addDataFut.get(getTestTimeout());
 
-        addDataFut = runAsync(() -> addData(cache, KEYS_CNT, KEYS_CNT * 2));
+        runAsync(() -> addData(cache, KEYS_CNT, KEYS_CNT * 2)).get(getTestTimeout());
 
-        addDataFut.get(getTestTimeout());
+        // Wait while predicate will become true and state saved on the disk for both cdc.
+        assertTrue(latch.await(getTestTimeout(), MILLISECONDS));
 
-        assertTrue(waitForSize(KEYS_CNT * 2, DEFAULT_CACHE_NAME, UPDATE, getTestTimeout(), cnsmr1, cnsmr2));
+        long evtsCnt1 = checkMetrics(cdc1, -1);
+        long evtsCnt2 = checkMetrics(cdc2, -1);

Review comment:
       Yes. Thank you.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] anton-vinogradov commented on a change in pull request #9398: IGNITE-14355 CDC Metrics

Posted by GitBox <gi...@apache.org>.
anton-vinogradov commented on a change in pull request #9398:
URL: https://github.com/apache/ignite/pull/9398#discussion_r714041436



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java
##########
@@ -114,12 +115,48 @@
     /** State dir. */
     public static final String STATE_DIR = "state";
 
+    /** Current segment index metric name. */
+    public static final String CUR_SEG_IDX = "CurrentSegmentIndex";
+
+    /** Committed segment index metric name. */
+    public static final String COMMITTED_SEG_IDX = "CommittedSegmentIndex";
+
+    /** Committed segment offset metric name. */
+    public static final String COMMITTED_SEG_OFF = "CommittedSegmentOffset";
+
+    /** Last segment consumption time. */
+    public static final String LAST_SEG_CONSUMPTION_TIME = "LastSegmentConsumptionTime";
+
+    /** Binary metadata metric name. */
+    public static final String BINARY_META = "BinaryMeta";

Review comment:
       DIR & Dir?

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java
##########
@@ -114,12 +115,39 @@
     /** State dir. */
     public static final String STATE_DIR = "state";
 
+    /** Current segment index metric name. */
+    public static final String CUR_SEG_IDX = "CurrentSegmentIndex";
+
+    /** Committed segment index metric name. */
+    public static final String COMMITTED_SEG_IDX = "CommittedSegmentIndex";
+
+    /** Committed segment offset metric name. */
+    public static final String COMMITTED_SEG_OFF = "CommittedSegmentOffset";

Review comment:
       `OFFSET`, OFF means disable(d)

##########
File path: modules/core/src/test/java/org/apache/ignite/cdc/AbstractCdcTest.java
##########
@@ -111,16 +132,69 @@ public boolean waitForSize(
             timeout);
     }
 
+    /** */
+    public long checkMetrics(CdcMain cdc, int expCnt) throws Exception {

Review comment:
       expCnt never checked

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/cdc/WalRecordsConsumer.java
##########
@@ -123,6 +137,10 @@ private boolean hasCurrent() {
 
             /** */
             private CdcEvent transform(DataEntry e) {
+                evtsCnt.increment();
+
+                lastEvtTs.value(System.currentTimeMillis());

Review comment:
       It looks like the `next()` method is a better place for counting and ts recording.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java
##########
@@ -114,12 +115,48 @@
     /** State dir. */
     public static final String STATE_DIR = "state";
 
+    /** Current segment index metric name. */
+    public static final String CUR_SEG_IDX = "CurrentSegmentIndex";
+
+    /** Committed segment index metric name. */
+    public static final String COMMITTED_SEG_IDX = "CommittedSegmentIndex";
+
+    /** Committed segment offset metric name. */
+    public static final String COMMITTED_SEG_OFF = "CommittedSegmentOffset";
+
+    /** Last segment consumption time. */
+    public static final String LAST_SEG_CONSUMPTION_TIME = "LastSegmentConsumptionTime";
+
+    /** Binary metadata metric name. */
+    public static final String BINARY_META = "BinaryMeta";
+
+    /** Marshaller metric name. */
+    public static final String MARSHALLER = "Marshaller";

Review comment:
       DIR & Dir?

##########
File path: modules/core/src/main/java/org/apache/ignite/spi/metric/jmx/JmxMetricExporterSpi.java
##########
@@ -132,7 +132,7 @@ else if (log.isDebugEnabled())
 
             ObjectName mbean = U.registerMBean(
                 ignite().configuration().getMBeanServer(),
-                igniteInstanceName,
+                igniteInstanceName == null ? ignite().configuration().getIgniteInstanceName() : igniteInstanceName,

Review comment:
       Why no just make `igniteInstanceName == ignite().configuration().getIgniteInstanceName()` instead of `if`?
   Even another question:
   why igniteInstanceName != ignite().configuration().getIgniteInstanceName()`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org