You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/06/05 18:49:42 UTC
[kafka] branch trunk updated: KAFKA-6704:
InvalidStateStoreException from IQ when StreamThread closes store (#4801)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new ef41369 KAFKA-6704: InvalidStateStoreException from IQ when StreamThread closes store (#4801)
ef41369 is described below
commit ef413699b658c939fa0a3930ddbface69bedd26e
Author: Bill Bejeck <bb...@gmail.com>
AuthorDate: Tue Jun 5 14:49:36 2018 -0400
KAFKA-6704: InvalidStateStoreException from IQ when StreamThread closes store (#4801)
While using an iterator from IQ, it's possible to get an InvalidStateStoreException if the StreamThread closes the store during a range query.
Added a unit test to SegmentIteratorTest for this condition.
Reviewers: John Roesler <jo...@confluent.io>, Matthias J. Sax <ma...@confluent.io>, Guozhang Wang <wa...@gmail.com>
---
.../streams/state/internals/RocksDBStore.java | 54 +++++++++++++---------
.../streams/state/internals/SegmentIterator.java | 12 ++++-
.../state/internals/RocksDBWindowStoreTest.java | 19 ++------
.../state/internals/SegmentIteratorTest.java | 14 ++++++
4 files changed, 60 insertions(+), 39 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index d2b8cd2..ff6c56a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -17,6 +17,7 @@
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.AbstractIterator;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
@@ -441,46 +442,46 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]> {
}
}
- private class RocksDbIterator implements KeyValueIterator<Bytes, byte[]> {
+ private class RocksDbIterator extends AbstractIterator<KeyValue<Bytes, byte[]>> implements KeyValueIterator<Bytes, byte[]> {
private final String storeName;
private final RocksIterator iter;
private volatile boolean open = true;
+ private KeyValue<Bytes, byte[]> next;
+
RocksDbIterator(final String storeName,
final RocksIterator iter) {
this.iter = iter;
this.storeName = storeName;
}
- byte[] peekRawKey() {
- return iter.key();
- }
-
- private KeyValue<Bytes, byte[]> getKeyValue() {
- return new KeyValue<>(new Bytes(iter.key()), iter.value());
- }
-
@Override
public synchronized boolean hasNext() {
if (!open) {
throw new InvalidStateStoreException(String.format("RocksDB store %s has closed", storeName));
}
-
- return iter.isValid();
+ return super.hasNext();
}
- /**
- * @throws NoSuchElementException if no next element exist
- */
@Override
public synchronized KeyValue<Bytes, byte[]> next() {
- if (!hasNext())
- throw new NoSuchElementException();
+ return super.next();
+ }
- final KeyValue<Bytes, byte[]> entry = this.getKeyValue();
- iter.next();
- return entry;
+ @Override
+ public KeyValue<Bytes, byte[]> makeNext() {
+ if (!iter.isValid()) {
+ return allDone();
+ } else {
+ next = this.getKeyValue();
+ iter.next();
+ return next;
+ }
+ }
+
+ private KeyValue<Bytes, byte[]> getKeyValue() {
+ return new KeyValue<>(new Bytes(iter.key()), iter.value());
}
@Override
@@ -500,7 +501,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]> {
if (!hasNext()) {
throw new NoSuchElementException();
}
- return new Bytes(iter.key());
+ return next.key;
}
}
@@ -524,8 +525,17 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]> {
}
@Override
- public synchronized boolean hasNext() {
- return super.hasNext() && comparator.compare(super.peekRawKey(), this.rawToKey) <= 0;
+ public KeyValue<Bytes, byte[]> makeNext() {
+ final KeyValue<Bytes, byte[]> next = super.makeNext();
+
+ if (next == null) {
+ return allDone();
+ } else {
+ if (comparator.compare(next.key.get(), this.rawToKey) <= 0)
+ return next;
+ else
+ return allDone();
+ }
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentIterator.java
index 099cba1..331ffdb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentIterator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentIterator.java
@@ -66,7 +66,7 @@ class SegmentIterator implements KeyValueIterator<Bytes, byte[]> {
@Override
public boolean hasNext() {
boolean hasNext = false;
- while ((currentIterator == null || !(hasNext = hasNextCondition.hasNext(currentIterator)) || !currentSegment.isOpen())
+ while ((currentIterator == null || !(hasNext = hasNextConditionHasNext()) || !currentSegment.isOpen())
&& segments.hasNext()) {
close();
currentSegment = segments.next();
@@ -83,6 +83,16 @@ class SegmentIterator implements KeyValueIterator<Bytes, byte[]> {
return currentIterator != null && hasNext;
}
+ private boolean hasNextConditionHasNext() {
+ boolean hasNext = false;
+ try {
+ hasNext = hasNextCondition.hasNext(currentIterator);
+ } catch (InvalidStateStoreException e) {
+ //already closed so ignore
+ }
+ return hasNext;
+ }
+
public KeyValue<Bytes, byte[]> next() {
if (!hasNext()) {
throw new NoSuchElementException();
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
index be4ede8..c436e9e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
@@ -27,7 +27,6 @@ import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler;
-import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
@@ -61,7 +60,6 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
@SuppressWarnings("PointlessArithmeticExpression")
public class RocksDBWindowStoreTest {
@@ -747,7 +745,7 @@ public class RocksDBWindowStoreTest {
}
@Test
- public void shouldCloseOpenIteratorsWhenStoreIsClosedAndThrowInvalidStateStoreExceptionOnHasNextAndNext() {
+ public void shouldCloseOpenIteratorsWhenStoreIsClosedAndNotThrowInvalidStateStoreExceptionOnHasNext() {
windowStore = createWindowStore(context);
context.setRecordContext(createRecordContext(0));
windowStore.put(1, "one", 1L);
@@ -757,20 +755,9 @@ public class RocksDBWindowStoreTest {
final WindowStoreIterator<String> iterator = windowStore.fetch(1, 1L, 3L);
assertTrue(iterator.hasNext());
windowStore.close();
- try {
- //noinspection ResultOfMethodCallIgnored
- iterator.hasNext();
- fail("should have thrown InvalidStateStoreException on closed store");
- } catch (final InvalidStateStoreException e) {
- // ok
- }
- try {
- iterator.next();
- fail("should have thrown InvalidStateStoreException on closed store");
- } catch (final InvalidStateStoreException e) {
- // ok
- }
+ assertFalse(iterator.hasNext());
+
}
@Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java
index 7a7b266..8045964 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java
@@ -31,6 +31,7 @@ import org.junit.Before;
import org.junit.Test;
import java.util.Arrays;
+import java.util.Collections;
import java.util.NoSuchElementException;
import static org.junit.Assert.assertEquals;
@@ -105,6 +106,19 @@ public class SegmentIteratorTest {
}
@Test
+ public void shouldNotThrowExceptionOnHasNextWhenStoreClosed() {
+ iterator = new SegmentIterator(Collections.singletonList(segmentOne).iterator(),
+ hasNextCondition,
+ Bytes.wrap("a".getBytes()),
+ Bytes.wrap("z".getBytes()));
+
+
+ iterator.currentIterator = segmentOne.all();
+ segmentOne.close();
+ assertFalse(iterator.hasNext());
+ }
+
+ @Test
public void shouldOnlyIterateOverSegmentsInRange() {
iterator = new SegmentIterator(Arrays.asList(segmentOne, segmentTwo).iterator(),
hasNextCondition,
--
To stop receiving notification emails like this one, please contact
guozhang@apache.org.