You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2018/01/13 07:53:29 UTC

[5/6] flink git commit: [FLINK-8306] [kafka, tests] Fix mock verifications on final method

[FLINK-8306] [kafka, tests] Fix mock verifications on final method

Previously, offset commit behavioural tests relied on verifying on
AbstractFetcher::commitInternalOffsetsToKafka(). That method is actually
final, and could not be mocked.

This commit fixes that by implementing a proper mock AbstractFetcher,
which keeps track of the offset commits that go through.

This closes #5284.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c4bfc7de
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c4bfc7de
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c4bfc7de

Branch: refs/heads/release-1.4
Commit: c4bfc7de36201d7a144ae995931ffd3a079ed649
Parents: 968683f
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Fri Jan 12 08:45:32 2018 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Sat Jan 13 11:34:20 2018 +0800

----------------------------------------------------------------------
 .../kafka/FlinkKafkaConsumerBaseTest.java       | 203 ++++++++++---------
 .../kafka/internals/AbstractFetcherTest.java    |  63 +-----
 .../kafka/testutils/TestSourceContext.java      |  87 ++++++++
 3 files changed, 199 insertions(+), 154 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c4bfc7de/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
index 4361c67..f091c08 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
@@ -19,7 +19,6 @@
 package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.state.KeyedStateStore;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
@@ -48,7 +47,9 @@ import org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicsDescriptor;
 import org.apache.flink.streaming.connectors.kafka.testutils.TestPartitionDiscoverer;
+import org.apache.flink.streaming.connectors.kafka.testutils.TestSourceContext;
 import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
 import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import org.apache.flink.util.Preconditions;
@@ -56,11 +57,13 @@ import org.apache.flink.util.SerializedValue;
 
 import org.junit.Assert;
 import org.junit.Test;
-import org.mockito.Matchers;
-import org.mockito.Mockito;
+
+import javax.annotation.Nonnull;
 
 import java.io.Serializable;
+import java.util.ArrayDeque;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -68,6 +71,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import static org.apache.flink.util.Preconditions.checkState;
 import static org.hamcrest.Matchers.everyItem;
 import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.collection.IsIn.isIn;
@@ -75,14 +79,11 @@ import static org.hamcrest.collection.IsMapContaining.hasKey;
 import static org.hamcrest.core.IsNot.not;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.mockito.Matchers.anyMap;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
 
 /**
  * Tests for the {@link FlinkKafkaConsumerBase}.
@@ -129,7 +130,11 @@ public class FlinkKafkaConsumerBaseTest {
 	@Test
 	public void ignoreCheckpointWhenNotRunning() throws Exception {
 		@SuppressWarnings("unchecked")
-		final FlinkKafkaConsumerBase<String> consumer = new DummyFlinkKafkaConsumer<>();
+		final MockFetcher<String> fetcher = new MockFetcher<>();
+		final FlinkKafkaConsumerBase<String> consumer = new DummyFlinkKafkaConsumer<>(
+				fetcher,
+				mock(AbstractPartitionDiscoverer.class),
+				false);
 
 		final TestingListState<Tuple2<KafkaTopicPartition, Long>> listState = new TestingListState<>();
 		setupConsumer(consumer, false, listState, true, 0, 1);
@@ -139,6 +144,11 @@ public class FlinkKafkaConsumerBaseTest {
 
 		// no state should have been checkpointed
 		assertFalse(listState.get().iterator().hasNext());
+
+		// acknowledgement of the checkpoint should also not result in any offset commits
+		consumer.notifyCheckpointComplete(1L);
+		assertNull(fetcher.getAndClearLastCommittedOffsets());
+		assertEquals(0, fetcher.getCommitCount());
 	}
 
 	/**
@@ -265,10 +275,7 @@ public class FlinkKafkaConsumerBaseTest {
 
 		// --------------------------------------------------------------------
 
-		final OneShotLatch runLatch = new OneShotLatch();
-		final OneShotLatch stopLatch = new OneShotLatch();
-		final AbstractFetcher<String, ?> fetcher = getRunnableMockFetcher(runLatch, stopLatch);
-		when(fetcher.snapshotCurrentState()).thenReturn(state1, state2, state3);
+		final MockFetcher<String> fetcher = new MockFetcher<>(state1, state2, state3);
 
 		final FlinkKafkaConsumerBase<String> consumer = new DummyFlinkKafkaConsumer<>(
 				fetcher,
@@ -283,17 +290,11 @@ public class FlinkKafkaConsumerBaseTest {
 		final CheckedThread runThread = new CheckedThread() {
 			@Override
 			public void go() throws Exception {
-				consumer.run(mock(SourceFunction.SourceContext.class));
-			}
-
-			@Override
-			public void sync() throws Exception {
-				stopLatch.trigger();
-				super.sync();
+				consumer.run(new TestSourceContext<>());
 			}
 		};
 		runThread.start();
-		runLatch.await();
+		fetcher.waitUntilRun();
 
 		assertEquals(0, consumer.getPendingOffsetsToCommit().size());
 
@@ -329,6 +330,8 @@ public class FlinkKafkaConsumerBaseTest {
 		consumer.notifyCheckpointComplete(138L);
 		assertEquals(1, consumer.getPendingOffsetsToCommit().size());
 		assertTrue(consumer.getPendingOffsetsToCommit().containsKey(140L));
+		assertEquals(state1, fetcher.getAndClearLastCommittedOffsets());
+		assertEquals(1, fetcher.getCommitCount());
 
 		// checkpoint 3
 		consumer.snapshotState(new StateSnapshotContextSynchronousImpl(141, 141));
@@ -347,28 +350,15 @@ public class FlinkKafkaConsumerBaseTest {
 		// ack checkpoint 3, subsumes number 2
 		consumer.notifyCheckpointComplete(141L);
 		assertEquals(0, consumer.getPendingOffsetsToCommit().size());
+		assertEquals(state3, fetcher.getAndClearLastCommittedOffsets());
+		assertEquals(2, fetcher.getCommitCount());
 
 		consumer.notifyCheckpointComplete(666); // invalid checkpoint
 		assertEquals(0, consumer.getPendingOffsetsToCommit().size());
+		assertNull(fetcher.getAndClearLastCommittedOffsets());
+		assertEquals(2, fetcher.getCommitCount());
 
-		// create 500 snapshots
-		for (int i = 100; i < 600; i++) {
-			consumer.snapshotState(new StateSnapshotContextSynchronousImpl(i, i));
-			listState.clear();
-		}
-		assertEquals(FlinkKafkaConsumerBase.MAX_NUM_PENDING_CHECKPOINTS, consumer.getPendingOffsetsToCommit().size());
-
-		// commit only the second last
-		consumer.notifyCheckpointComplete(598);
-		assertEquals(1, consumer.getPendingOffsetsToCommit().size());
-
-		// access invalid checkpoint
-		consumer.notifyCheckpointComplete(590);
-
-		// and the last
-		consumer.notifyCheckpointComplete(599);
-		assertEquals(0, consumer.getPendingOffsetsToCommit().size());
-
+		consumer.cancel();
 		runThread.sync();
 	}
 
@@ -393,10 +383,7 @@ public class FlinkKafkaConsumerBaseTest {
 
 		// --------------------------------------------------------------------
 
-		final OneShotLatch runLatch = new OneShotLatch();
-		final OneShotLatch stopLatch = new OneShotLatch();
-		final AbstractFetcher<String, ?> fetcher = getRunnableMockFetcher(runLatch, stopLatch);
-		when(fetcher.snapshotCurrentState()).thenReturn(state1, state2, state3);
+		final MockFetcher<String> fetcher = new MockFetcher<>(state1, state2, state3);
 
 		final FlinkKafkaConsumerBase<String> consumer = new DummyFlinkKafkaConsumer<>(
 				fetcher,
@@ -412,17 +399,11 @@ public class FlinkKafkaConsumerBaseTest {
 		final CheckedThread runThread = new CheckedThread() {
 			@Override
 			public void go() throws Exception {
-				consumer.run(mock(SourceFunction.SourceContext.class));
-			}
-
-			@Override
-			public void sync() throws Exception {
-				stopLatch.trigger();
-				super.sync();
+				consumer.run(new TestSourceContext<>());
 			}
 		};
 		runThread.start();
-		runLatch.await();
+		fetcher.waitUntilRun();
 
 		assertEquals(0, consumer.getPendingOffsetsToCommit().size());
 
@@ -454,7 +435,8 @@ public class FlinkKafkaConsumerBaseTest {
 
 		// ack checkpoint 1
 		consumer.notifyCheckpointComplete(138L);
-		verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap(), Matchers.any(KafkaCommitCallback.class)); // no offsets should be committed
+		assertEquals(0, fetcher.getCommitCount());
+		assertNull(fetcher.getAndClearLastCommittedOffsets()); // no offsets should be committed
 
 		// checkpoint 3
 		consumer.snapshotState(new StateSnapshotContextSynchronousImpl(141, 141));
@@ -471,29 +453,15 @@ public class FlinkKafkaConsumerBaseTest {
 
 		// ack checkpoint 3, subsumes number 2
 		consumer.notifyCheckpointComplete(141L);
-		verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap(), Matchers.any(KafkaCommitCallback.class)); // no offsets should be committed
+		assertEquals(0, fetcher.getCommitCount());
+		assertNull(fetcher.getAndClearLastCommittedOffsets()); // no offsets should be committed
 
 		consumer.notifyCheckpointComplete(666); // invalid checkpoint
-		verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap(), Matchers.any(KafkaCommitCallback.class)); // no offsets should be committed
-
-		// create 500 snapshots
-		for (int i = 100; i < 600; i++) {
-			consumer.snapshotState(new StateSnapshotContextSynchronousImpl(i, i));
-			listState.clear();
-		}
-		assertEquals(0, consumer.getPendingOffsetsToCommit().size()); // pending offsets to commit should not be updated
+		assertEquals(0, fetcher.getCommitCount());
+		assertNull(fetcher.getAndClearLastCommittedOffsets()); // no offsets should be committed
 
-		// commit only the second last
-		consumer.notifyCheckpointComplete(598);
-		verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap(), Matchers.any(KafkaCommitCallback.class)); // no offsets should be committed
-
-		// access invalid checkpoint
-		consumer.notifyCheckpointComplete(590);
-		verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap(), Matchers.any(KafkaCommitCallback.class)); // no offsets should be committed
-
-		// and the last
-		consumer.notifyCheckpointComplete(599);
-		verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap(), Matchers.any(KafkaCommitCallback.class)); // no offsets should be committed
+		consumer.cancel();
+		runThread.sync();
 	}
 
 	@Test
@@ -734,26 +702,6 @@ public class FlinkKafkaConsumerBaseTest {
 		}
 	}
 
-	/**
-	 * Returns a mock {@link AbstractFetcher}, with run / stop latches injected in
-	 * the {@link AbstractFetcher#runFetchLoop()} method.
-	 */
-	private static <T> AbstractFetcher<T, ?> getRunnableMockFetcher(
-			OneShotLatch runLatch,
-			OneShotLatch stopLatch) throws Exception {
-
-		@SuppressWarnings("unchecked")
-		final AbstractFetcher<T, ?> fetcher = mock(AbstractFetcher.class);
-
-		Mockito.doAnswer(invocationOnMock -> {
-			runLatch.trigger();
-			stopLatch.await();
-			return null;
-		}).when(fetcher).runFetchLoop();
-
-		return fetcher;
-	}
-
 	@SuppressWarnings("unchecked")
 	private static <T, S> void setupConsumer(
 			FlinkKafkaConsumerBase<T> consumer,
@@ -769,6 +717,77 @@ public class FlinkKafkaConsumerBaseTest {
 		consumer.open(new Configuration());
 	}
 
+	private static class MockFetcher<T> extends AbstractFetcher<T, Object> {
+
+		private final OneShotLatch runLatch = new OneShotLatch();
+		private final OneShotLatch stopLatch = new OneShotLatch();
+
+		private final ArrayDeque<HashMap<KafkaTopicPartition, Long>> stateSnapshotsToReturn = new ArrayDeque<>();
+
+		private Map<KafkaTopicPartition, Long> lastCommittedOffsets;
+		private int commitCount = 0;
+
+		@SafeVarargs
+		private MockFetcher(HashMap<KafkaTopicPartition, Long>... stateSnapshotsToReturn) throws Exception {
+			super(
+					new TestSourceContext<>(),
+					new HashMap<>(),
+					null,
+					null,
+					new TestProcessingTimeService(),
+					0,
+					MockFetcher.class.getClassLoader(),
+					false);
+
+			this.stateSnapshotsToReturn.addAll(Arrays.asList(stateSnapshotsToReturn));
+		}
+
+		@Override
+		protected void doCommitInternalOffsetsToKafka(
+				Map<KafkaTopicPartition, Long> offsets,
+				@Nonnull KafkaCommitCallback commitCallback) throws Exception {
+			this.lastCommittedOffsets = offsets;
+			this.commitCount++;
+			commitCallback.onSuccess();
+		}
+
+		@Override
+		public void runFetchLoop() throws Exception {
+			runLatch.trigger();
+			stopLatch.await();
+		}
+
+		@Override
+		public HashMap<KafkaTopicPartition, Long> snapshotCurrentState() {
+			checkState(!stateSnapshotsToReturn.isEmpty());
+			return stateSnapshotsToReturn.poll();
+		}
+
+		@Override
+		protected Object createKafkaPartitionHandle(KafkaTopicPartition partition) {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
+		public void cancel() {
+			stopLatch.trigger();
+		}
+
+		private void waitUntilRun() throws InterruptedException {
+			runLatch.await();
+		}
+
+		private Map<KafkaTopicPartition, Long> getAndClearLastCommittedOffsets() {
+			Map<KafkaTopicPartition, Long> offsets = this.lastCommittedOffsets;
+			this.lastCommittedOffsets = null;
+			return offsets;
+		}
+
+		private int getCommitCount() {
+			return commitCount;
+		}
+	}
+
 	private static class MockRuntimeContext extends StreamingRuntimeContext {
 
 		private final boolean isCheckpointingEnabled;
@@ -784,7 +803,7 @@ public class FlinkKafkaConsumerBaseTest {
 			super(
 				new MockStreamOperator(),
 				new MockEnvironment("mockTask", 4 * MemoryManager.DEFAULT_PAGE_SIZE, null, 16),
-				Collections.<String, Accumulator<?, ?>>emptyMap());
+				Collections.emptyMap());
 
 			this.isCheckpointingEnabled = isCheckpointingEnabled;
 			this.numParallelSubtasks = numParallelSubtasks;

http://git-wip-us.apache.org/repos/asf/flink/blob/c4bfc7de/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java
index e4a58dd..6fe1d6f 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java
@@ -22,7 +22,7 @@ import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
 import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
 import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
 import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.connectors.kafka.testutils.TestSourceContext;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
 import org.apache.flink.util.SerializedValue;
@@ -444,67 +444,6 @@ public class AbstractFetcherTest {
 
 	// ------------------------------------------------------------------------
 
-	private static final class TestSourceContext<T> implements SourceContext<T> {
-
-		private final Object checkpointLock = new Object();
-		private final Object watermarkLock = new Object();
-
-		private volatile StreamRecord<T> latestElement;
-		private volatile Watermark currentWatermark;
-
-		@Override
-		public void collect(T element) {
-			this.latestElement = new StreamRecord<>(element);
-		}
-
-		@Override
-		public void collectWithTimestamp(T element, long timestamp) {
-			this.latestElement = new StreamRecord<>(element, timestamp);
-		}
-
-		@Override
-		public void emitWatermark(Watermark mark) {
-			synchronized (watermarkLock) {
-				currentWatermark = mark;
-				watermarkLock.notifyAll();
-			}
-		}
-
-		@Override
-		public void markAsTemporarilyIdle() {
-			throw new UnsupportedOperationException();
-		}
-
-		@Override
-		public Object getCheckpointLock() {
-			return checkpointLock;
-		}
-
-		@Override
-		public void close() {}
-
-		public StreamRecord<T> getLatestElement() {
-			return latestElement;
-		}
-
-		public boolean hasWatermark() {
-			return currentWatermark != null;
-		}
-
-		public Watermark getLatestWatermark() throws InterruptedException {
-			synchronized (watermarkLock) {
-				while (currentWatermark == null) {
-					watermarkLock.wait();
-				}
-				Watermark wm = currentWatermark;
-				currentWatermark = null;
-				return wm;
-			}
-		}
-	}
-
-	// ------------------------------------------------------------------------
-
 	private static class PeriodicTestExtractor implements AssignerWithPeriodicWatermarks<Long> {
 
 		private volatile long maxTimestamp = Long.MIN_VALUE;

http://git-wip-us.apache.org/repos/asf/flink/blob/c4bfc7de/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/TestSourceContext.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/TestSourceContext.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/TestSourceContext.java
new file mode 100644
index 0000000..2a96a68
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/TestSourceContext.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.testutils;
+
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+/**
+ * Test {@link org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext}.
+ */
+public final class TestSourceContext<T> implements SourceFunction.SourceContext<T> {
+
+	private final Object checkpointLock = new Object();
+	private final Object watermarkLock = new Object();
+
+	private volatile StreamRecord<T> latestElement;
+	private volatile Watermark currentWatermark;
+
+	@Override
+	public void collect(T element) {
+		this.latestElement = new StreamRecord<>(element);
+	}
+
+	@Override
+	public void collectWithTimestamp(T element, long timestamp) {
+		this.latestElement = new StreamRecord<>(element, timestamp);
+	}
+
+	@Override
+	public void emitWatermark(Watermark mark) {
+		synchronized (watermarkLock) {
+			currentWatermark = mark;
+			watermarkLock.notifyAll();
+		}
+	}
+
+	@Override
+	public void markAsTemporarilyIdle() {
+		// do nothing
+	}
+
+	@Override
+	public Object getCheckpointLock() {
+		return checkpointLock;
+	}
+
+	@Override
+	public void close() {
+		// do nothing
+	}
+
+	public StreamRecord<T> getLatestElement() {
+		return latestElement;
+	}
+
+	public boolean hasWatermark() {
+		return currentWatermark != null;
+	}
+
+	public Watermark getLatestWatermark() throws InterruptedException {
+		synchronized (watermarkLock) {
+			while (currentWatermark == null) {
+				watermarkLock.wait();
+			}
+			Watermark wm = currentWatermark;
+			currentWatermark = null;
+			return wm;
+		}
+	}
+}