You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2021/05/04 15:37:40 UTC
[ignite-3] 07/08: IGNITE-14389 Implemented cursor for ranges and
watches.
This is an automated email from the ASF dual-hosted git repository.
agura pushed a commit to branch ignite-14389
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit c465eff81ec2ff4ba6d084dc48d2d5b15b72a2d9
Author: Andrey Gura <ag...@apache.org>
AuthorDate: Thu Apr 29 02:44:00 2021 +0300
IGNITE-14389 Implemented cursor for ranges and watches.
---
.../ignite/internal/metastorage/server/Entry.java | 17 +
.../internal/metastorage/server/EntryEvent.java | 58 +++
.../metastorage/server/KeyValueStorage.java | 33 +-
.../server/SimpleInMemoryKeyValueStorage.java | 333 +++++++++------
.../ignite/internal/metastorage/server/Value.java | 17 +
.../ignite/internal/metastorage/server/Watch.java | 45 ---
.../internal/metastorage/server/WatchEvent.java | 54 +++
.../internal/metastorage/server/Watcher.java | 13 -
.../internal/metastorage/server/WatcherImpl.java | 58 ---
.../server/SimpleInMemoryKeyValueStorageTest.java | 446 ++++++++++++++++++---
10 files changed, 764 insertions(+), 310 deletions(-)
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Entry.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Entry.java
index 263a88b..87b5471 100644
--- a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Entry.java
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Entry.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.ignite.internal.metastorage.server;
import org.jetbrains.annotations.NotNull;
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/EntryEvent.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/EntryEvent.java
new file mode 100644
index 0000000..554a3a7
--- /dev/null
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/EntryEvent.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server;
+
+/**
+ * Represent an change event for particular key and entry.
+ */
+public class EntryEvent {
+ /** Old (previous) entry. */
+ private final Entry oldEntry;
+
+ /** New (current) entry. */
+ private final Entry entry;
+
+ /**
+ * Constructs event with given old and new entries.
+ *
+ * @param oldEntry Old entry.
+ * @param curEntry New entry.
+ */
+ EntryEvent(Entry oldEntry, Entry curEntry) {
+ this.oldEntry = oldEntry;
+ this.entry = curEntry;
+ }
+
+ /**
+ * Returns old entry.
+ *
+ * @return Old entry.
+ */
+ public Entry oldEntry() {
+ return oldEntry;
+ }
+
+ /**
+ * Rreturns new entry.
+ *
+ * @return New entry.
+ */
+ public Entry entry() {
+ return entry;
+ }
+}
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
index 526e4fb..5d6da44 100644
--- a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
@@ -1,13 +1,28 @@
-package org.apache.ignite.internal.metastorage.server;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
-import org.jetbrains.annotations.NotNull;
+package org.apache.ignite.internal.metastorage.server;
import java.util.Collection;
-import java.util.Iterator;
import java.util.List;
+import org.apache.ignite.metastorage.common.Cursor;
+import org.jetbrains.annotations.NotNull;
public interface KeyValueStorage {
-
long revision();
long updateCounter();
@@ -44,11 +59,15 @@ public interface KeyValueStorage {
@NotNull
Collection<Entry> getAndRemoveAll(List<byte[]> keys);
- Iterator<Entry> range(byte[] keyFrom, byte[] keyTo);
+ Cursor<Entry> range(byte[] keyFrom, byte[] keyTo);
+
+ Cursor<Entry> range(byte[] keyFrom, byte[] keyTo, long revUpperBound);
+
+ Cursor<WatchEvent> watch(byte[] keyFrom, byte[] keyTo, long rev);
- Iterator<Entry> iterate(byte[] key);
+ Cursor<WatchEvent> watch(byte[] key, long rev);
- //Iterator<Entry> iterate(long rev);
+ Cursor<WatchEvent> watch(Collection<byte[]> keys, long rev);
void compact();
}
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
index 32f720e..b37c96a 100644
--- a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
@@ -1,8 +1,34 @@
-package org.apache.ignite.internal.metastorage.server;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
-import java.util.*;
-import java.util.function.Consumer;
+package org.apache.ignite.internal.metastorage.server;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.NoSuchElementException;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.function.Predicate;
import org.apache.ignite.metastorage.common.Cursor;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.TestOnly;
@@ -13,15 +39,11 @@ import static org.apache.ignite.internal.metastorage.server.Value.TOMBSTONE;
* WARNING: Only for test purposes and only for non-distributed (one static instance) storage.
*/
public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
- private static final Comparator<byte[]> LEXICOGRAPHIC_COMPARATOR = Arrays::compare;
+ private static final Comparator<byte[]> CMP = Arrays::compare;
private static final long LATEST_REV = -1;
- private final Watcher watcher;
-
- private final List<Cursor<Entry>> rangeCursors = new ArrayList<>();
-
- private NavigableMap<byte[], List<Long>> keysIdx = new TreeMap<>(LEXICOGRAPHIC_COMPARATOR);
+ private NavigableMap<byte[], List<Long>> keysIdx = new TreeMap<>(CMP);
private NavigableMap<Long, NavigableMap<byte[], Value>> revsIdx = new TreeMap<>();
@@ -31,10 +53,6 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
private final Object mux = new Object();
- public SimpleInMemoryKeyValueStorage(Watcher watcher) {
- this.watcher = watcher;
- }
-
@Override public long revision() {
return rev;
}
@@ -141,9 +159,7 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
List<byte[]> vals = new ArrayList<>(keys.size());
- for (int i = 0; i < keys.size(); i++) {
- byte[] key = keys.get(i);
-
+ for (byte[] key : keys) {
Entry e = doGet(key, LATEST_REV, false);
if (e.empty() || e.tombstone())
@@ -169,9 +185,7 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
List<byte[]> vals = new ArrayList<>(keys.size());
- for (int i = 0; i < keys.size(); i++) {
- byte[] key = keys.get(i);
-
+ for (byte[] key : keys) {
Entry e = doGet(key, LATEST_REV, false);
res.add(e);
@@ -190,90 +204,45 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
return res;
}
- @Override
- public Iterator<Entry> range(byte[] keyFrom, byte[] keyTo) {
- return null;
+ @Override public Cursor<Entry> range(byte[] keyFrom, byte[] keyTo) {
+ return new RangeCursor(keyFrom, keyTo, rev);
}
- @Override public Iterator<Entry> iterate(byte[] keyFrom) {
- synchronized (mux) {
- NavigableMap<byte[], List<Long>> tailMap = keysIdx.tailMap(keyFrom, true);
-
- final Iterator<Map.Entry<byte[], List<Long>>> it = tailMap.entrySet().iterator();
-
- return new Iterator<>() {
- private Map.Entry<byte[], List<Long>> curr;
- private boolean hasNext;
-
- private void advance() {
- if (it.hasNext()) {
- Map.Entry<byte[], List<Long>> e = it.next();
-
- byte[] key = e.getKey();
-
- if (!isPrefix(keyFrom, key))
- hasNext = false;
- else {
- curr = e;
-
- hasNext = true;
- }
- } else
- hasNext = false;
- }
-
- @Override
- public boolean hasNext() {
- synchronized (mux) {
- if (curr == null)
- advance();
-
- return hasNext;
- }
- }
-
- @Override
- public Entry next() {
- synchronized (mux) {
- if (!hasNext())
- throw new NoSuchElementException();
-
- Map.Entry<byte[], List<Long>> e = curr;
-
- curr = null;
+ @Override public Cursor<Entry> range(byte[] keyFrom, byte[] keyTo, long revUpperBound) {
+ return new RangeCursor(keyFrom, keyTo, revUpperBound);
+ }
- byte[] key = e.getKey();
+ @Override public Cursor<WatchEvent> watch(byte[] keyFrom, byte[] keyTo, long rev) {
+ assert keyFrom != null;
+ assert rev > 0;
- List<Long> revs = e.getValue();
+ return new WatchCursor(rev, k ->
+ CMP.compare(keyFrom, k) <= 0 && (keyTo == null || CMP.compare(k, keyTo) < 0)
+ );
+ }
- long rev = revs == null || revs.isEmpty() ? 0 : lastRevision(revs);
+ @Override public Cursor<WatchEvent> watch(byte[] key, long rev) {
+ assert key != null;
+ assert rev > 0;
- if (rev == 0) {
- throw new IllegalStateException("rev == 0");
- //return new AbstractMap.SimpleImmutableEntry<>(key, null);
- }
+ return new WatchCursor(rev, k -> CMP.compare(k, key) == 0);
+ }
- NavigableMap<byte[], Value> vals = revsIdx.get(rev);
+ @Override public Cursor<WatchEvent> watch(Collection<byte[]> keys, long rev) {
+ assert keys != null && !keys.isEmpty();
+ assert rev > 0;
- if (vals == null || vals.isEmpty()) {
- throw new IllegalStateException("vals == null || vals.isEmpty()");
- //return new AbstractMap.SimpleImmutableEntry<>(key, null);
- }
+ TreeSet<byte[]> keySet = new TreeSet<>(CMP);
- Value val = vals.get(key);
+ keySet.addAll(keys);
- return val.tombstone() ?
- Entry.tombstone(key, rev, val.updateCounter()) :
- new Entry(key, val.bytes(), rev, val.updateCounter());
- }
- }
- };
- }
+ return new WatchCursor(rev, keySet::contains);
}
+
@Override public void compact() {
synchronized (mux) {
- NavigableMap<byte[], List<Long>> compactedKeysIdx = new TreeMap<>(LEXICOGRAPHIC_COMPARATOR);
+ NavigableMap<byte[], List<Long>> compactedKeysIdx = new TreeMap<>(CMP);
NavigableMap<Long, NavigableMap<byte[], Value>> compactedRevsIdx = new TreeMap<>();
@@ -302,7 +271,7 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
NavigableMap<byte[], Value> compactedKv = compactedRevsIdx.computeIfAbsent(
lastRev,
- k -> new TreeMap<>(LEXICOGRAPHIC_COMPARATOR)
+ k -> new TreeMap<>(CMP)
);
compactedKv.put(key, lastVal);
@@ -409,7 +378,7 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
revs.add(curRev);
// Update revsIdx.
- NavigableMap<byte[], Value> entries = new TreeMap<>(LEXICOGRAPHIC_COMPARATOR);
+ NavigableMap<byte[], Value> entries = new TreeMap<>(CMP);
Value val = new Value(bytes, curUpdCntr);
@@ -423,7 +392,7 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
private long doPutAll(long curRev, List<byte[]> keys, List<byte[]> bytesList) {
synchronized (mux) {
// Update revsIdx.
- NavigableMap<byte[], Value> entries = new TreeMap<>(LEXICOGRAPHIC_COMPARATOR);
+ NavigableMap<byte[], Value> entries = new TreeMap<>(CMP);
for (int i = 0; i < keys.size(); i++) {
byte[] key = keys.get(i);
@@ -452,19 +421,6 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
}
}
-
- private static boolean isPrefix(byte[] pref, byte[] term) {
- if (pref.length > term.length)
- return false;
-
- for (int i = 0; i < pref.length - 1; i++) {
- if (pref[i] != term[i])
- return false;
- }
-
- return true;
- }
-
private static long lastRevision(List<Long> revs) {
return revs.get(revs.size() - 1);
}
@@ -481,55 +437,184 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
private final byte[] keyFrom;
private final byte[] keyTo;
private final long rev;
- private byte[] curKey;
+ private Entry nextRetEntry;
+ private byte[] lastRetKey;
+ private boolean finished;
- public RangeCursor(byte[] keyFrom, byte[] keyTo, long rev) {
+ RangeCursor(byte[] keyFrom, byte[] keyTo, long rev) {
this.keyFrom = keyFrom;
this.keyTo = keyTo;
this.rev = rev;
}
@Override public void close() throws Exception {
-
+ // TODO: implement.
}
@NotNull
@Override public Iterator<Entry> iterator() {
- return new Iterator<Entry>() {
+ return new Iterator<>() {
@Override public boolean hasNext() {
synchronized (mux) {
- byte[] key = keysIdx.ceilingKey(curKey);
+ while (true) {
+ if (finished)
+ return false;
+
+ if (nextRetEntry != null)
+ return true;
- return key != null;
+ byte[] key = lastRetKey;
+
+ while (!finished || nextRetEntry == null) {
+ Map.Entry<byte[], List<Long>> e =
+ key == null ? keysIdx.ceilingEntry(keyFrom) : keysIdx.higherEntry(key);
+
+ if (e == null) {
+ finished = true;
+
+ break;
+ }
+
+ key = e.getKey();
+
+ if (keyTo != null && CMP.compare(key, keyTo) >= 0) {
+ finished = true;
+
+ break;
+ }
+
+ List<Long> revs = e.getValue();
+
+ assert revs != null && !revs.isEmpty() :
+ "Revisions should not be empty: [revs=" + revs + ']';
+
+ long lastRev = maxRevision(revs, rev);
+
+ if (lastRev == -1)
+ continue;
+
+ Entry entry = doGetValue(key, lastRev);
+
+ assert !entry.empty() : "Iterator should not return empty entry.";
+
+ nextRetEntry = entry;
+
+ break;
+ }
+ }
}
}
@Override public Entry next() {
synchronized (mux) {
- Map.Entry<byte[], List<Long>> e = keysIdx.ceilingEntry(curKey);
-
- if (e == null)
- throw new NoSuchElementException();
+ while (true) {
+ if (finished)
+ throw new NoSuchElementException();
- List<Long> revs = e.getValue();
+ if (nextRetEntry != null) {
+ Entry e = nextRetEntry;
- assert revs != null && !revs.isEmpty() :
- "Revisions should not be empty: [revs=" + revs + ']';
+ nextRetEntry = null;
- //lastRevision(re)
+ lastRetKey = e.key();
- return null;
+ return e;
+ } else
+ hasNext();
+ }
}
}
};
}
+ }
- @Override public void forEach(Consumer<? super Entry> action) {
- Cursor.super.forEach(action);
+ private class WatchCursor implements Cursor<WatchEvent> {
+ private final Predicate<byte[]> p;
+ private long lastRetRev;
+ private long nextRetRev = -1;
+
+ WatchCursor(long rev, Predicate<byte[]> p) {
+ this.p = p;
+ this.lastRetRev = rev - 1;
}
- @Override public Spliterator<Entry> spliterator() {
- return Cursor.super.spliterator();
+ @Override public void close() throws Exception {
+ // TODO: implement
+ }
+
+ @NotNull
+ @Override public Iterator<WatchEvent> iterator() {
+ return new Iterator<>() {
+ @Override public boolean hasNext() {
+ synchronized (mux) {
+ if (nextRetRev != -1)
+ return true;
+
+ while (true) {
+ long curRev = lastRetRev + 1;
+
+ NavigableMap<byte[], Value> entries = revsIdx.get(curRev);
+
+ if (entries == null)
+ return false;
+
+ for (byte[] key : entries.keySet()) {
+ if (p.test(key)) {
+ nextRetRev = curRev;
+
+ return true;
+ }
+ }
+
+ lastRetRev++;
+ }
+ }
+ }
+
+ @Override public WatchEvent next() {
+ synchronized (mux) {
+ while (true) {
+ if (nextRetRev != -1) {
+ NavigableMap<byte[], Value> entries = revsIdx.get(nextRetRev);
+
+ if (entries == null)
+ return null;
+
+ List<EntryEvent> evts = new ArrayList<>(entries.size());
+
+ for (Map.Entry<byte[], Value> e : entries.entrySet()) {
+ byte[] key = e.getKey();
+
+ Value val = e.getValue();
+
+ if (p.test(key)) {
+ Entry newEntry;
+
+ if (val.tombstone())
+ newEntry = Entry.tombstone(key, nextRetRev, val.updateCounter());
+ else
+ newEntry = new Entry(key, val.bytes(), nextRetRev, val.updateCounter());
+
+ Entry oldEntry = doGet(key, nextRetRev - 1, false);
+
+ evts.add(new EntryEvent(oldEntry, newEntry));
+ }
+ }
+
+ if (evts.isEmpty())
+ continue;
+
+ lastRetRev = nextRetRev;
+
+ nextRetRev = -1;
+
+ return new WatchEvent(evts);
+ } else if (!hasNext())
+ return null;
+ }
+ }
+ }
+ };
}
}
}
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Value.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Value.java
index 250a5ea..a438fd4 100644
--- a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Value.java
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Value.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.ignite.internal.metastorage.server;
import org.jetbrains.annotations.NotNull;
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Watch.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Watch.java
deleted file mode 100644
index 26cfa5c..0000000
--- a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Watch.java
+++ /dev/null
@@ -1,45 +0,0 @@
-package org.apache.ignite.internal.metastorage.server;
-
-import org.jetbrains.annotations.Nullable;
-
-import java.util.Arrays;
-import java.util.Comparator;
-
-public class Watch {
- private static final Comparator<byte[]> CMP = Arrays::compare;
-
- private static final long ANY_REVISION = -1;
-
- @Nullable
- private byte[] startKey;
-
- @Nullable
- private byte[] endKey;
-
- long rev = ANY_REVISION;
-
- public void startKey(byte[] startKey) {
- this.startKey = startKey;
- }
-
- public void endKey(byte[] endKey) {
- this.endKey = endKey;
- }
-
- public void revision(long rev) {
- this.rev = rev;
- }
-
- public void notify(Entry e) {
- if (startKey != null && CMP.compare(e.key(), startKey) < 0)
- return;
-
- if (endKey != null && CMP.compare(e.key(), endKey) > 0)
- return;
-
- if (rev != ANY_REVISION && e.revision() <= rev)
- return;
-
- System.out.println("Entry: key=" + new String(e.key()) + ", value=" + new String(e.value()) + ", rev=" + e.revision());
- }
-}
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/WatchEvent.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/WatchEvent.java
new file mode 100644
index 0000000..561f203
--- /dev/null
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/WatchEvent.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server;
+
+import java.util.Collection;
+import java.util.List;
+
+public class WatchEvent {
+ private final List<EntryEvent> entryEvts;
+
+ private final boolean batch;
+
+ /**
+ * Constructs an watch event with given entry events collection.
+ *
+ * @param entryEvts Events for entries corresponding to an update under one revision.
+ */
+ public WatchEvent(List<EntryEvent> entryEvts) {
+ assert entryEvts != null && !entryEvts.isEmpty();
+
+ this.batch = entryEvts.size() > 1;
+ this.entryEvts = entryEvts;
+ }
+
+ public boolean batch() {
+ return batch;
+ }
+
+ public Collection<EntryEvent> entryEvents() {
+ return entryEvts;
+ }
+
+ public EntryEvent entryEvent() {
+ if (batch)
+ throw new IllegalStateException("Watch event represents a batch of events.");
+
+ return entryEvts.get(0);
+ }
+}
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Watcher.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Watcher.java
deleted file mode 100644
index 5516d06..0000000
--- a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Watcher.java
+++ /dev/null
@@ -1,13 +0,0 @@
-package org.apache.ignite.internal.metastorage.server;
-
-import org.jetbrains.annotations.NotNull;
-
-public interface Watcher {
- void register(@NotNull Watch watch);
-
- void notify(@NotNull Entry e);
-
- //TODO: implement
- void cancel(@NotNull Watch watch);
-}
-
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/WatcherImpl.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/WatcherImpl.java
deleted file mode 100644
index dc126a0..0000000
--- a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/WatcherImpl.java
+++ /dev/null
@@ -1,58 +0,0 @@
-package org.apache.ignite.internal.metastorage.server;
-
-import org.jetbrains.annotations.NotNull;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-
-public class WatcherImpl implements Watcher {
- private final BlockingQueue<Entry> queue = new LinkedBlockingQueue<>();
-
- private final List<Watch> watches = new ArrayList<>();
-
- private volatile boolean stop;
-
- private final Object mux = new Object();
-
- @Override public void register(@NotNull Watch watch) {
- synchronized (mux) {
- watches.add(watch);
- }
- }
-
- @Override public void notify(@NotNull Entry e) {
- queue.offer(e);
- }
-
- @Override
- public void cancel(@NotNull Watch watch) {
- throw new UnsupportedOperationException("Not implemented yet.");
- }
-
- public void shutdown() {
- stop = true;
- }
-
- private class WatcherWorker implements Runnable {
- @Override public void run() {
- while (!stop) {
- try {
- Entry e = queue.poll(100, TimeUnit.MILLISECONDS);
-
- if (e != null) {
- synchronized (mux) {
- watches.forEach(w -> w.notify(e));
- }
- }
- }
- catch (InterruptedException interruptedException) {
- // No-op.
- }
- }
- }
- }
-}
-
diff --git a/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java b/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java
index 4a73137..27df790 100644
--- a/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java
+++ b/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java
@@ -1,17 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.ignite.internal.metastorage.server;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.TreeMap;
-import java.util.function.Function;
+import java.util.NoSuchElementException;
import java.util.stream.Collectors;
+import org.apache.ignite.metastorage.common.Cursor;
import org.apache.ignite.metastorage.common.Key;
-import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import static java.util.function.Function.identity;
import static org.junit.jupiter.api.Assertions.*;
class SimpleInMemoryKeyValueStorageTest {
@@ -19,7 +36,7 @@ class SimpleInMemoryKeyValueStorageTest {
@BeforeEach
public void setUp() {
- storage = new SimpleInMemoryKeyValueStorage(new NoOpWatcher());
+ storage = new SimpleInMemoryKeyValueStorage();
}
@Test
@@ -91,7 +108,7 @@ class SimpleInMemoryKeyValueStorageTest {
assertEquals(4, entries.size());
- Map<Key, Entry> map = entries.stream().collect(Collectors.toMap(e -> new Key(e.key()), Function.identity()));
+ Map<Key, Entry> map = entries.stream().collect(Collectors.toMap(e -> new Key(e.key()), identity()));
// Test regular put value.
Entry e1 = map.get(new Key(key1));
@@ -166,7 +183,7 @@ class SimpleInMemoryKeyValueStorageTest {
assertEquals(4, entries.size());
- Map<Key, Entry> map = entries.stream().collect(Collectors.toMap(e -> new Key(e.key()), Function.identity()));
+ Map<Key, Entry> map = entries.stream().collect(Collectors.toMap(e -> new Key(e.key()), identity()));
// Test regular put value.
Entry e1 = map.get(new Key(key1));
@@ -204,7 +221,7 @@ class SimpleInMemoryKeyValueStorageTest {
assertEquals(4, entries.size());
- map = entries.stream().collect(Collectors.toMap(e -> new Key(e.key()), Function.identity()));
+ map = entries.stream().collect(Collectors.toMap(e -> new Key(e.key()), identity()));
// Test regular put value.
e1 = map.get(new Key(key1));
@@ -308,7 +325,7 @@ class SimpleInMemoryKeyValueStorageTest {
assertEquals(4, entries.size());
- Map<Key, Entry> map = entries.stream().collect(Collectors.toMap(e -> new Key(e.key()), Function.identity()));
+ Map<Key, Entry> map = entries.stream().collect(Collectors.toMap(e -> new Key(e.key()), identity()));
// Test regular put value.
Entry e1 = map.get(new Key(key1));
@@ -382,7 +399,7 @@ class SimpleInMemoryKeyValueStorageTest {
assertEquals(3, entries.size());
- Map<Key, Entry> map = entries.stream().collect(Collectors.toMap(e -> new Key(e.key()), Function.identity()));
+ Map<Key, Entry> map = entries.stream().collect(Collectors.toMap(e -> new Key(e.key()), identity()));
// Test regular put value.
Entry e1 = map.get(new Key(key1));
@@ -417,7 +434,7 @@ class SimpleInMemoryKeyValueStorageTest {
assertEquals(4, entries.size());
- map = entries.stream().collect(Collectors.toMap(e -> new Key(e.key()), Function.identity()));
+ map = entries.stream().collect(Collectors.toMap(e -> new Key(e.key()), identity()));
// Test regular put value.
e1 = map.get(new Key(key1));
@@ -601,7 +618,7 @@ class SimpleInMemoryKeyValueStorageTest {
assertEquals(4, entries.size());
- Map<Key, Entry> map = entries.stream().collect(Collectors.toMap(e -> new Key(e.key()), Function.identity()));
+ Map<Key, Entry> map = entries.stream().collect(Collectors.toMap(e -> new Key(e.key()), identity()));
// Test regular put value.
Entry e1 = map.get(new Key(key1));
@@ -676,7 +693,7 @@ class SimpleInMemoryKeyValueStorageTest {
assertEquals(4, entries.size());
- Map<Key, Entry> map = entries.stream().collect(Collectors.toMap(e -> new Key(e.key()), Function.identity()));
+ Map<Key, Entry> map = entries.stream().collect(Collectors.toMap(e -> new Key(e.key()), identity()));
// Test regular put value.
Entry e1 = map.get(new Key(key1));
@@ -719,7 +736,7 @@ class SimpleInMemoryKeyValueStorageTest {
assertEquals(4, entries.size());
- map = entries.stream().collect(Collectors.toMap(e -> new Key(e.key()), Function.identity()));
+ map = entries.stream().collect(Collectors.toMap(e -> new Key(e.key()), identity()));
// Test regular put value.
e1 = map.get(new Key(key1));
@@ -987,60 +1004,377 @@ class SimpleInMemoryKeyValueStorageTest {
}
@Test
- public void iterate() {
- TreeMap<String, String> expFooMap = new TreeMap<>();
- TreeMap<String, String> expKeyMap = new TreeMap<>();
- TreeMap<String, String> expZooMap = new TreeMap<>();
+ public void rangeCursor() {
+ byte[] key1 = k(1);
+ byte[] val1 = kv(1, 1);
- fill("foo", storage, expFooMap);
- fill("key", storage, expKeyMap);
- fill("zoo", storage, expZooMap);
+ byte[] key2 = k(2);
+ byte[] val2 = kv(2, 2);
- assertEquals(300, storage.revision());
- assertEquals(300, storage.updateCounter());
+ byte[] key3 = k(3);
+ byte[] val3 = kv(3, 3);
- assertIterate("key", storage, expKeyMap);
- assertIterate("zoo", storage, expZooMap);
- assertIterate("foo", storage, expFooMap);
- }
- private void assertIterate(String pref, KeyValueStorage storage, TreeMap<String, String> expMap) {
- Iterator<Entry> it = storage.iterate((pref + "_").getBytes());
- Iterator<Map.Entry<String, String>> expIt = expMap.entrySet().iterator();
+ assertEquals(0, storage.revision());
+ assertEquals(0, storage.updateCounter());
+
+ storage.putAll(List.of(key1, key2, key3), List.of(val1, val2, val3));
+
+ assertEquals(1, storage.revision());
+ assertEquals(3, storage.updateCounter());
+
+ // Range for latest revision without max bound.
+ Cursor<Entry> cur = storage.range(key1, null);
+
+ Iterator<Entry> it = cur.iterator();
+
+ assertTrue(it.hasNext());
+
+ Entry e1 = it.next();
+
+ assertFalse(e1.empty());
+ assertFalse(e1.tombstone());
+ assertArrayEquals(key1, e1.key());
+ assertArrayEquals(val1, e1.value());
+ assertEquals(1, e1.revision());
+ assertEquals(1, e1.updateCounter());
+
+ assertTrue(it.hasNext());
+
+ Entry e2 = it.next();
- // Order.
- while (it.hasNext()) {
- Entry entry = it.next();
- Map.Entry<String, String> expEntry = expIt.next();
+ assertFalse(e2.empty());
+ assertFalse(e2.tombstone());
+ assertArrayEquals(key2, e2.key());
+ assertArrayEquals(val2, e2.value());
+ assertEquals(1, e2.revision());
+ assertEquals(2, e2.updateCounter());
+
+ // Deliberately don't call it.hasNext()
+
+ Entry e3 = it.next();
+
+ assertFalse(e3.empty());
+ assertFalse(e3.tombstone());
+ assertArrayEquals(key3, e3.key());
+ assertArrayEquals(val3, e3.value());
+ assertEquals(1, e3.revision());
+ assertEquals(3, e3.updateCounter());
+
+ assertFalse(it.hasNext());
- assertEquals(expEntry.getKey(), new String(entry.key()));
- assertEquals(expEntry.getValue(), new String(entry.value()));
+ try {
+ it.next();
+
+ fail();
+ }
+ catch (NoSuchElementException e) {
+ System.out.println();
+ // No-op.
}
- // Range boundaries.
- it = storage.iterate((pref + '_').getBytes());
+ // Range for latest revision with max bound.
+ cur = storage.range(key1, key3);
+
+ it = cur.iterator();
+
+ assertTrue(it.hasNext());
+
+ e1 = it.next();
+
+ assertFalse(e1.empty());
+ assertFalse(e1.tombstone());
+ assertArrayEquals(key1, e1.key());
+ assertArrayEquals(val1, e1.value());
+ assertEquals(1, e1.revision());
+ assertEquals(1, e1.updateCounter());
- while (it.hasNext()) {
- Entry entry = it.next();
+ assertTrue(it.hasNext());
- assertTrue(expMap.containsKey(new String(entry.key())));
+ e2 = it.next();
+
+ assertFalse(e2.empty());
+ assertFalse(e2.tombstone());
+ assertArrayEquals(key2, e2.key());
+ assertArrayEquals(val2, e2.value());
+ assertEquals(1, e2.revision());
+ assertEquals(2, e2.updateCounter());
+
+ assertFalse(it.hasNext());
+
+ try {
+ it.next();
+
+ fail();
+ }
+ catch (NoSuchElementException e) {
+ System.out.println();
+ // No-op.
}
}
- private static void fill(String pref, KeyValueStorage storage, TreeMap<String, String> expMap) {
- for (int i = 0; i < 100; i++) {
- String keyStr = pref + '_' + i;
+ @Test
+ public void watchCursorForRange() {
+ byte[] key1 = k(1);
+ byte[] val1_1 = kv(1, 11);
- String valStr = "val_" + i;
+ byte[] key2 = k(2);
+ byte[] val2_1 = kv(2, 21);
+ byte[] val2_2 = kv(2, 22);
+
+ byte[] key3 = k(3);
+ byte[] val3_1 = kv(3, 31);
- expMap.put(keyStr, valStr);
+ assertEquals(0, storage.revision());
+ assertEquals(0, storage.updateCounter());
- byte[] key = keyStr.getBytes();
+ // Watch for all updates starting from revision 2.
+ Cursor<WatchEvent> cur = storage.watch(key1, null, 2);
- byte[] val = valStr.getBytes();
+ Iterator<WatchEvent> it = cur.iterator();
- storage.getAndPut(key, val);
- }
+ assertFalse(it.hasNext());
+ assertNull(it.next());
+
+ storage.putAll(List.of(key1, key2), List.of(val1_1, val2_1));
+
+ assertEquals(1, storage.revision());
+ assertEquals(2, storage.updateCounter());
+
+ // Revision is less than 2.
+ assertFalse(it.hasNext());
+ assertNull(it.next());
+
+ storage.putAll(List.of(key2, key3), List.of(val2_2, val3_1));
+
+ assertEquals(2, storage.revision());
+ assertEquals(4, storage.updateCounter());
+
+ // Revision is 2.
+ assertTrue(it.hasNext());
+
+ WatchEvent watchEvent = it.next();
+
+ assertTrue(watchEvent.batch());
+
+ Map<Key, EntryEvent> map = watchEvent.entryEvents().stream()
+ .collect(Collectors.toMap(evt -> new Key(evt.entry().key()), identity()));
+
+ assertEquals(2, map.size());
+
+ // First update under revision.
+ EntryEvent e2 = map.get(new Key(key2));
+
+ assertNotNull(e2);
+
+ Entry oldEntry2 = e2.oldEntry();
+
+ assertFalse(oldEntry2.empty());
+ assertFalse(oldEntry2.tombstone());
+ assertEquals(1, oldEntry2.revision());
+ assertEquals(2, oldEntry2.updateCounter());
+ assertArrayEquals(key2, oldEntry2.key());
+ assertArrayEquals(val2_1, oldEntry2.value());
+
+ Entry newEntry2 = e2.entry();
+
+ assertFalse(newEntry2.empty());
+ assertFalse(newEntry2.tombstone());
+ assertEquals(2, newEntry2.revision());
+ assertEquals(3, newEntry2.updateCounter());
+ assertArrayEquals(key2, newEntry2.key());
+ assertArrayEquals(val2_2, newEntry2.value());
+
+ // Second update under revision.
+ EntryEvent e3 = map.get(new Key(key3));
+
+ assertNotNull(e3);
+
+ Entry oldEntry3 = e3.oldEntry();
+
+ assertTrue(oldEntry3.empty());
+ assertFalse(oldEntry3.tombstone());
+ assertArrayEquals(key3, oldEntry3.key());
+
+ Entry newEntry3 = e3.entry();
+
+ assertFalse(newEntry3.empty());
+ assertFalse(newEntry3.tombstone());
+ assertEquals(2, newEntry3.revision());
+ assertEquals(4, newEntry3.updateCounter());
+ assertArrayEquals(key3, newEntry3.key());
+ assertArrayEquals(val3_1, newEntry3.value());
+
+ assertFalse(it.hasNext());
+
+ storage.remove(key1);
+
+ assertTrue(it.hasNext());
+
+ watchEvent = it.next();
+
+ assertFalse(watchEvent.batch());
+
+ EntryEvent e1 = watchEvent.entryEvent();
+
+ Entry oldEntry1 = e1.oldEntry();
+
+ assertFalse(oldEntry1.empty());
+ assertFalse(oldEntry1.tombstone());
+ assertEquals(1, oldEntry1.revision());
+ assertEquals(1, oldEntry1.updateCounter());
+ assertArrayEquals(key1, oldEntry1.key());
+ assertArrayEquals(val1_1, oldEntry1.value());
+
+ Entry newEntry1 = e1.entry();
+
+ assertFalse(newEntry1.empty());
+ assertTrue(newEntry1.tombstone());
+ assertEquals(3, newEntry1.revision());
+ assertEquals(5, newEntry1.updateCounter());
+ assertArrayEquals(key1, newEntry1.key());
+ assertNull(newEntry1.value());
+
+ assertFalse(it.hasNext());
+ }
+
+
+ @Test
+ public void watchCursorForKey() {
+ byte[] key1 = k(1);
+ byte[] val1_1 = kv(1, 11);
+ byte[] val1_2 = kv(1, 12);
+
+ byte[] key2 = k(2);
+ byte[] val2_1 = kv(2, 21);
+ byte[] val2_2 = kv(2, 22);
+
+
+ assertEquals(0, storage.revision());
+ assertEquals(0, storage.updateCounter());
+
+ Cursor<WatchEvent> cur = storage.watch(key1, 1);
+
+ Iterator<WatchEvent> it = cur.iterator();
+
+ assertFalse(it.hasNext());
+ assertNull(it.next());
+
+ storage.putAll(List.of(key1, key2), List.of(val1_1, val2_1));
+
+ assertEquals(1, storage.revision());
+ assertEquals(2, storage.updateCounter());
+
+ assertTrue(it.hasNext());
+
+ WatchEvent watchEvent = it.next();
+
+ assertFalse(watchEvent.batch());
+
+ EntryEvent e1 = watchEvent.entryEvent();
+
+ Entry oldEntry1 = e1.oldEntry();
+
+ assertTrue(oldEntry1.empty());
+ assertFalse(oldEntry1.tombstone());
+
+ Entry newEntry1 = e1.entry();
+
+ assertFalse(newEntry1.empty());
+ assertFalse(newEntry1.tombstone());
+ assertEquals(1, newEntry1.revision());
+ assertEquals(1, newEntry1.updateCounter());
+ assertArrayEquals(key1, newEntry1.key());
+ assertArrayEquals(val1_1, newEntry1.value());
+
+ assertFalse(it.hasNext());
+
+ storage.put(key2, val2_2);
+
+ assertFalse(it.hasNext());
+
+ storage.put(key1, val1_2);
+
+ assertTrue(it.hasNext());
+
+ watchEvent = it.next();
+
+ assertFalse(watchEvent.batch());
+
+ e1 = watchEvent.entryEvent();
+
+ oldEntry1 = e1.oldEntry();
+
+ assertFalse(oldEntry1.empty());
+ assertFalse(oldEntry1.tombstone());
+ assertEquals(1, oldEntry1.revision());
+ assertEquals(1, oldEntry1.updateCounter());
+ assertArrayEquals(key1, newEntry1.key());
+ assertArrayEquals(val1_1, newEntry1.value());
+
+ newEntry1 = e1.entry();
+
+ assertFalse(newEntry1.empty());
+ assertFalse(newEntry1.tombstone());
+ assertEquals(3, newEntry1.revision());
+ assertEquals(4, newEntry1.updateCounter());
+ assertArrayEquals(key1, newEntry1.key());
+ assertArrayEquals(val1_2, newEntry1.value());
+
+ assertFalse(it.hasNext());
+ }
+
+ @Test
+ public void watchCursorForKeys() {
+ byte[] key1 = k(1);
+ byte[] val1_1 = kv(1, 11);
+ byte[] val1_2 = kv(1, 12);
+
+ byte[] key2 = k(2);
+ byte[] val2_1 = kv(2, 21);
+ byte[] val2_2 = kv(2, 22);
+
+ byte[] key3 = k(3);
+ byte[] val3_1 = kv(3, 31);
+ byte[] val3_2 = kv(3, 32);
+
+
+ assertEquals(0, storage.revision());
+ assertEquals(0, storage.updateCounter());
+
+ Cursor<WatchEvent> cur = storage.watch(List.of(key1, key2), 1);
+
+ Iterator<WatchEvent> it = cur.iterator();
+
+ assertFalse(it.hasNext());
+ assertNull(it.next());
+
+ storage.putAll(List.of(key1, key2, key3), List.of(val1_1, val2_1, val3_1));
+
+ assertEquals(1, storage.revision());
+ assertEquals(3, storage.updateCounter());
+
+ assertTrue(it.hasNext());
+
+ WatchEvent watchEvent = it.next();
+
+ assertTrue(watchEvent.batch());
+
+ assertFalse(it.hasNext());
+
+ storage.put(key2, val2_2);
+
+ assertTrue(it.hasNext());
+
+ watchEvent = it.next();
+
+ assertFalse(watchEvent.batch());
+
+ assertFalse(it.hasNext());
+
+ storage.put(key3, val3_2);
+
+ assertFalse(it.hasNext());
}
private static void fill(KeyValueStorage storage, int keySuffix, int num) {
@@ -1055,18 +1389,4 @@ class SimpleInMemoryKeyValueStorageTest {
private static byte[] kv(int k, int v) {
return ("key" + k + '_' + "val" + v).getBytes();
}
-
- private static class NoOpWatcher implements Watcher {
- @Override public void register(@NotNull Watch watch) {
- // No-op.
- }
-
- @Override public void notify(@NotNull Entry e) {
- // No-op.
- }
-
- @Override public void cancel(@NotNull Watch watch) {
- // No-op.
- }
- }
}
\ No newline at end of file