You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2022/03/09 08:28:57 UTC

[GitHub] [ignite] nizhikov commented on a change in pull request #9828: IGNITE-14829 Save DataEntry index inside CDC state

nizhikov commented on a change in pull request #9828:
URL: https://github.com/apache/ignite/pull/9828#discussion_r822397254



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java
##########
@@ -648,4 +655,89 @@ private void ackAsciiLogo() {
     public static String cdcInstanceName(String igniteInstanceName) {
         return "cdc-" + igniteInstanceName;
     }
+
+    /** Iterator over {@link DataEntry}. */
+    public static class DataEntryIterator implements Iterator<DataEntry>, AutoCloseable {
+        /** WAL iterator. */
+        private final WALIterator walIter;
+
+        /** Last {@link DataRecord}.*/
+        private IgniteBiTuple<WALPointer, WALRecord> lastWalRec;
+
+        /** Iterator over {@link DataEntry}. */
+        private Iterator<DataEntry> entries;
+
+        /** Last {@link DataEntry} index inside last {@link DataRecord}. */
+        private int entryIdx;
+
+        /** @param walIter WAL iterator. */
+        DataEntryIterator(WALIterator walIter) {
+            this.walIter = walIter;
+        }
+
+        /** @return Current state. */
+        public CdcState state() {
+            return new CdcState(lastWalRec.get1(), entryIdx);
+        }
+
+        /** Initialize state. */
+        public void init(CdcState state) {
+            for (int i = 0; i < state.entryIndex(); i++) {
+                if (!hasNext())
+                    throw new IgniteException("Failed to restore entry index [rec=" + lastWalRec + ']');
+
+                next();
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean hasNext() {
+            advance();
+
+            return hasCurrent();
+        }
+
+        /** {@inheritDoc} */
+        @Override public DataEntry next() {
+            advance();
+
+            if (!hasCurrent())
+                throw new NoSuchElementException();
+
+            entryIdx++;
+
+            return entries.next();
+        }
+
+        /** */
+        private void advance() {
+            if (hasCurrent())
+                return;
+
+            while (walIter.hasNext()) {
+                lastWalRec = walIter.next();
+
+                DataRecord rec = (DataRecord)lastWalRec.get2();
+
+                entries = rec.writeEntries().iterator();

Review comment:
       This will create additional singletonList on each `DataRecord` that contains only one `DataEntry`.
   Let's use `DataRecord#entryCount` and `DataRecord#get(int)` to iterate over data records.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org