You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/03/01 20:07:48 UTC
kafka git commit: MINOR: improve MinTimestampTrackerTest and fix NPE
when null element removed
Repository: kafka
Updated Branches:
refs/heads/trunk 0fba52960 -> b380a82d5
MINOR: improve MinTimestampTrackerTest and fix NPE when null element removed
Author: Damian Guy <da...@gmail.com>
Reviewers: Matthias J. Sax, Guozhang Wang
Closes #2611 from dguy/testing
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b380a82d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b380a82d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b380a82d
Branch: refs/heads/trunk
Commit: b380a82d5be7c68141590467911ecb61db45ed1e
Parents: 0fba529
Author: Damian Guy <da...@gmail.com>
Authored: Wed Mar 1 12:07:46 2017 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Mar 1 12:07:46 2017 -0800
----------------------------------------------------------------------
.../internals/MinTimestampTracker.java | 15 ++-
.../internals/MinTimestampTrackerTest.java | 102 ++++++++-----------
2 files changed, 55 insertions(+), 62 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/b380a82d/streams/src/main/java/org/apache/kafka/streams/processor/internals/MinTimestampTracker.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/MinTimestampTracker.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/MinTimestampTracker.java
index ef7d990..a67675c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/MinTimestampTracker.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/MinTimestampTracker.java
@@ -34,7 +34,7 @@ public class MinTimestampTracker<E> implements TimestampTracker<E> {
/**
* @throws NullPointerException if the element is null
*/
- public void addElement(Stamped<E> elem) {
+ public void addElement(final Stamped<E> elem) {
if (elem == null) throw new NullPointerException();
Stamped<E> minElem = descendingSubsequence.peekLast();
@@ -45,12 +45,19 @@ public class MinTimestampTracker<E> implements TimestampTracker<E> {
descendingSubsequence.offerLast(elem);
}
- public void removeElement(Stamped<E> elem) {
- if (elem != null && descendingSubsequence.peekFirst() == elem)
+ public void removeElement(final Stamped<E> elem) {
+ if (elem == null) {
+ return;
+ }
+
+ if (descendingSubsequence.peekFirst() == elem) {
descendingSubsequence.removeFirst();
+ }
- if (descendingSubsequence.isEmpty())
+ if (descendingSubsequence.isEmpty()) {
lastKnownTime = elem.timestamp;
+ }
+
}
public int size() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/b380a82d/streams/src/test/java/org/apache/kafka/streams/processor/internals/MinTimestampTrackerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/MinTimestampTrackerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/MinTimestampTrackerTest.java
index c398dc5..f6a1518 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/MinTimestampTrackerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/MinTimestampTrackerTest.java
@@ -16,77 +16,63 @@
*/
package org.apache.kafka.streams.processor.internals;
-import static org.junit.Assert.assertEquals;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
import org.junit.Test;
public class MinTimestampTrackerTest {
- private Stamped<String> elem(long timestamp) {
- return new Stamped<>("", timestamp);
- }
+ private MinTimestampTracker<String> tracker = new MinTimestampTracker<>();
- @SuppressWarnings("unchecked")
@Test
- public void testTracking() {
- TimestampTracker<String> tracker = new MinTimestampTracker<>();
-
- Object[] elems = new Object[]{
- elem(100), elem(101), elem(102), elem(98), elem(99), elem(100)
- };
-
- int insertionIndex = 0;
- int removalIndex = 0;
-
- // add 100
- tracker.addElement((Stamped<String>) elems[insertionIndex++]);
- assertEquals(100L, tracker.get());
-
- // add 101
- tracker.addElement((Stamped<String>) elems[insertionIndex++]);
- assertEquals(100L, tracker.get());
-
- // remove 100
- tracker.removeElement((Stamped<String>) elems[removalIndex++]);
- assertEquals(101L, tracker.get());
-
- // add 102
- tracker.addElement((Stamped<String>) elems[insertionIndex++]);
- assertEquals(101L, tracker.get());
-
- // add 98
- tracker.addElement((Stamped<String>) elems[insertionIndex++]);
- assertEquals(98L, tracker.get());
-
- // add 99
- tracker.addElement((Stamped<String>) elems[insertionIndex++]);
- assertEquals(98L, tracker.get());
-
- // add 100
- tracker.addElement((Stamped<String>) elems[insertionIndex++]);
- assertEquals(98L, tracker.get());
+ public void shouldReturnNotKnownTimestampWhenNoRecordsEverAdded() throws Exception {
+ assertThat(tracker.get(), equalTo(TimestampTracker.NOT_KNOWN));
+ }
- // remove 101
- tracker.removeElement((Stamped<String>) elems[removalIndex++]);
- assertEquals(98L, tracker.get());
+ @Test
+ public void shouldReturnTimestampOfOnlyRecord() throws Exception {
+ tracker.addElement(elem(100));
+ assertThat(tracker.get(), equalTo(100L));
+ }
- // remove 102
- tracker.removeElement((Stamped<String>) elems[removalIndex++]);
- assertEquals(98L, tracker.get());
+ @Test
+ public void shouldReturnLowestAvailableTimestampFromAllInputs() throws Exception {
+ tracker.addElement(elem(100));
+ tracker.addElement(elem(99));
+ tracker.addElement(elem(102));
+ assertThat(tracker.get(), equalTo(99L));
+ }
- // remove 98
- tracker.removeElement((Stamped<String>) elems[removalIndex++]);
- assertEquals(99L, tracker.get());
+ @Test
+ public void shouldReturnLowestAvailableTimestampAfterPreviousLowestRemoved() throws Exception {
+ final Stamped<String> lowest = elem(88);
+ tracker.addElement(lowest);
+ tracker.addElement(elem(101));
+ tracker.addElement(elem(99));
+ tracker.removeElement(lowest);
+ assertThat(tracker.get(), equalTo(99L));
+ }
- // remove 99
- tracker.removeElement((Stamped<String>) elems[removalIndex++]);
- assertEquals(100L, tracker.get());
+ @Test
+ public void shouldReturnLastKnownTimestampWhenAllElementsHaveBeenRemoved() throws Exception {
+ final Stamped<String> record = elem(98);
+ tracker.addElement(record);
+ tracker.removeElement(record);
+ assertThat(tracker.get(), equalTo(98L));
+ }
- // remove 100
- tracker.removeElement((Stamped<String>) elems[removalIndex++]);
- assertEquals(100L, tracker.get());
+ @Test
+ public void shouldIgnoreNullRecordOnRemove() throws Exception {
+ tracker.removeElement(null);
+ }
- assertEquals(insertionIndex, removalIndex);
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowNullPointerExceptionWhenTryingToAddNullElement() throws Exception {
+ tracker.addElement(null);
}
+ private Stamped<String> elem(final long timestamp) {
+ return new Stamped<>("", timestamp);
+ }
}
\ No newline at end of file