You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jp...@apache.org on 2016/12/15 18:51:45 UTC
[06/20] nifi git commit: NIFI-3198: Refactored how PublishKafka and
PublishKafka_0_10 work to improve throughput and resilience. Fixed bug in
StreamDemarcator. Slight refactoring of consume processors to simplify code.
http://git-wip-us.apache.org/repos/asf/nifi/blob/ed17df50/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafka.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafka.java
new file mode 100644
index 0000000..f9f2485
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafka.java
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.kafka.pubsub;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestPublishKafka {
+ private static final String TOPIC_NAME = "unit-test";
+
+ private PublisherPool mockPool;
+ private PublisherLease mockLease;
+ private TestRunner runner;
+
+ @Before
+ public void setup() {
+ mockPool = mock(PublisherPool.class);
+ mockLease = mock(PublisherLease.class);
+
+ when(mockPool.obtainPublisher()).thenReturn(mockLease);
+
+ runner = TestRunners.newTestRunner(new PublishKafka() {
+ @Override
+ protected PublisherPool createPublisherPool(final ProcessContext context) {
+ return mockPool;
+ }
+ });
+
+ runner.setProperty(PublishKafka.TOPIC, TOPIC_NAME);
+ }
+
+ @Test
+ public void testSingleSuccess() throws IOException {
+ final MockFlowFile flowFile = runner.enqueue("hello world");
+
+ when(mockLease.complete()).thenReturn(createAllSuccessPublishResult(flowFile, 1));
+
+ runner.run();
+ runner.assertAllFlowFilesTransferred(PublishKafka.REL_SUCCESS, 1);
+
+ verify(mockLease, times(1)).publish(any(FlowFile.class), any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME));
+ verify(mockLease, times(1)).complete();
+ verify(mockLease, times(0)).poison();
+ verify(mockLease, times(1)).close();
+ }
+
+ @Test
+ public void testMultipleSuccess() throws IOException {
+ final Set<FlowFile> flowFiles = new HashSet<>();
+ flowFiles.add(runner.enqueue("hello world"));
+ flowFiles.add(runner.enqueue("hello world"));
+ flowFiles.add(runner.enqueue("hello world"));
+
+
+ when(mockLease.complete()).thenReturn(createAllSuccessPublishResult(flowFiles, 1));
+
+ runner.run();
+ runner.assertAllFlowFilesTransferred(PublishKafka.REL_SUCCESS, 3);
+
+ verify(mockLease, times(3)).publish(any(FlowFile.class), any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME));
+ verify(mockLease, times(1)).complete();
+ verify(mockLease, times(0)).poison();
+ verify(mockLease, times(1)).close();
+ }
+
+ @Test
+ public void testSingleFailure() throws IOException {
+ final MockFlowFile flowFile = runner.enqueue("hello world");
+
+ when(mockLease.complete()).thenReturn(createFailurePublishResult(flowFile));
+
+ runner.run();
+ runner.assertAllFlowFilesTransferred(PublishKafka.REL_FAILURE, 1);
+
+ verify(mockLease, times(1)).publish(any(FlowFile.class), any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME));
+ verify(mockLease, times(1)).complete();
+ verify(mockLease, times(1)).close();
+ }
+
+ @Test
+ public void testMultipleFailures() throws IOException {
+ final Set<FlowFile> flowFiles = new HashSet<>();
+ flowFiles.add(runner.enqueue("hello world"));
+ flowFiles.add(runner.enqueue("hello world"));
+ flowFiles.add(runner.enqueue("hello world"));
+
+ when(mockLease.complete()).thenReturn(createFailurePublishResult(flowFiles));
+
+ runner.run();
+ runner.assertAllFlowFilesTransferred(PublishKafka.REL_FAILURE, 3);
+
+ verify(mockLease, times(3)).publish(any(FlowFile.class), any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME));
+ verify(mockLease, times(1)).complete();
+ verify(mockLease, times(1)).close();
+ }
+
+ @Test
+ public void testMultipleMessagesPerFlowFile() throws IOException {
+ final List<FlowFile> flowFiles = new ArrayList<>();
+ flowFiles.add(runner.enqueue("hello world"));
+ flowFiles.add(runner.enqueue("hello world"));
+
+ final Map<FlowFile, Integer> msgCounts = new HashMap<>();
+ msgCounts.put(flowFiles.get(0), 10);
+ msgCounts.put(flowFiles.get(1), 20);
+
+ final PublishResult result = createPublishResult(msgCounts, new HashSet<>(flowFiles), Collections.emptyMap());
+
+ when(mockLease.complete()).thenReturn(result);
+
+ runner.run();
+ runner.assertAllFlowFilesTransferred(PublishKafka.REL_SUCCESS, 2);
+
+ verify(mockLease, times(2)).publish(any(FlowFile.class), any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME));
+ verify(mockLease, times(1)).complete();
+ verify(mockLease, times(0)).poison();
+ verify(mockLease, times(1)).close();
+
+ runner.assertAllFlowFilesContainAttribute("msg.count");
+ assertEquals(1, runner.getFlowFilesForRelationship(PublishKafka.REL_SUCCESS).stream()
+ .filter(ff -> ff.getAttribute("msg.count").equals("10"))
+ .count());
+ assertEquals(1, runner.getFlowFilesForRelationship(PublishKafka.REL_SUCCESS).stream()
+ .filter(ff -> ff.getAttribute("msg.count").equals("20"))
+ .count());
+ }
+
+
+ @Test
+ public void testSomeSuccessSomeFailure() throws IOException {
+ final List<FlowFile> flowFiles = new ArrayList<>();
+ flowFiles.add(runner.enqueue("hello world"));
+ flowFiles.add(runner.enqueue("hello world"));
+ flowFiles.add(runner.enqueue("hello world"));
+ flowFiles.add(runner.enqueue("hello world"));
+
+ final Map<FlowFile, Integer> msgCounts = new HashMap<>();
+ msgCounts.put(flowFiles.get(0), 10);
+ msgCounts.put(flowFiles.get(1), 20);
+
+ final Map<FlowFile, Exception> failureMap = new HashMap<>();
+ failureMap.put(flowFiles.get(2), new RuntimeException("Intentional Unit Test Exception"));
+ failureMap.put(flowFiles.get(3), new RuntimeException("Intentional Unit Test Exception"));
+
+ final PublishResult result = createPublishResult(msgCounts, new HashSet<>(flowFiles.subList(0, 2)), failureMap);
+
+ when(mockLease.complete()).thenReturn(result);
+
+ runner.run();
+ runner.assertTransferCount(PublishKafka.REL_SUCCESS, 2);
+ runner.assertTransferCount(PublishKafka.REL_FAILURE, 2);
+
+ verify(mockLease, times(4)).publish(any(FlowFile.class), any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME));
+ verify(mockLease, times(1)).complete();
+ verify(mockLease, times(1)).close();
+
+ assertEquals(1, runner.getFlowFilesForRelationship(PublishKafka.REL_SUCCESS).stream()
+ .filter(ff -> "10".equals(ff.getAttribute("msg.count")))
+ .count());
+ assertEquals(1, runner.getFlowFilesForRelationship(PublishKafka.REL_SUCCESS).stream()
+ .filter(ff -> "20".equals(ff.getAttribute("msg.count")))
+ .count());
+
+ assertTrue(runner.getFlowFilesForRelationship(PublishKafka.REL_FAILURE).stream()
+ .noneMatch(ff -> ff.getAttribute("msg.count") != null));
+ }
+
+
+ private PublishResult createAllSuccessPublishResult(final FlowFile successfulFlowFile, final int msgCount) {
+ return createAllSuccessPublishResult(Collections.singleton(successfulFlowFile), msgCount);
+ }
+
+ private PublishResult createAllSuccessPublishResult(final Set<FlowFile> successfulFlowFiles, final int msgCountPerFlowFile) {
+ final Map<FlowFile, Integer> msgCounts = new HashMap<>();
+ for (final FlowFile ff : successfulFlowFiles) {
+ msgCounts.put(ff, msgCountPerFlowFile);
+ }
+ return createPublishResult(msgCounts, successfulFlowFiles, Collections.emptyMap());
+ }
+
+ private PublishResult createFailurePublishResult(final FlowFile failure) {
+ return createFailurePublishResult(Collections.singleton(failure));
+ }
+
+ private PublishResult createFailurePublishResult(final Set<FlowFile> failures) {
+ final Map<FlowFile, Exception> failureMap = failures.stream().collect(Collectors.toMap(ff -> ff, ff -> new RuntimeException("Intentional Unit Test Exception")));
+ return createPublishResult(Collections.emptyMap(), Collections.emptySet(), failureMap);
+ }
+
+ private PublishResult createPublishResult(final Map<FlowFile, Integer> msgCounts, final Set<FlowFile> successFlowFiles, final Map<FlowFile, Exception> failures) {
+ // sanity check.
+ for (final FlowFile success : successFlowFiles) {
+ if (failures.containsKey(success)) {
+ throw new IllegalArgumentException("Found same FlowFile in both 'success' and 'failures' collections: " + success);
+ }
+ }
+
+ return new PublishResult() {
+ @Override
+ public Collection<FlowFile> getSuccessfulFlowFiles() {
+ return successFlowFiles;
+ }
+
+ @Override
+ public Collection<FlowFile> getFailedFlowFiles() {
+ return failures.keySet();
+ }
+
+ @Override
+ public int getSuccessfulMessageCount(FlowFile flowFile) {
+ Integer count = msgCounts.get(flowFile);
+ return count == null ? 0 : count.intValue();
+ }
+
+ @Override
+ public Exception getReasonForFailure(FlowFile flowFile) {
+ return failures.get(flowFile);
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/ed17df50/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java
new file mode 100644
index 0000000..c2d143c
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.kafka.pubsub;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.util.MockFlowFile;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+
+public class TestPublisherLease {
+ private ComponentLog logger;
+ private Producer<byte[], byte[]> producer;
+
+ @Before
+ @SuppressWarnings("unchecked")
+ public void setup() {
+ logger = Mockito.mock(ComponentLog.class);
+ producer = Mockito.mock(Producer.class);
+ }
+
+ @Test
+ public void testPoisonOnException() throws IOException {
+ final AtomicInteger poisonCount = new AtomicInteger(0);
+
+ final PublisherLease lease = new PublisherLease(producer, 1024 * 1024, 1000L, logger) {
+ @Override
+ public void poison() {
+ poisonCount.incrementAndGet();
+ super.poison();
+ }
+ };
+
+ final FlowFile flowFile = new MockFlowFile(1L);
+ final String topic = "unit-test";
+ final byte[] messageKey = null;
+ final byte[] demarcatorBytes = null;
+
+ final InputStream failureInputStream = new InputStream() {
+ @Override
+ public int read() throws IOException {
+ throw new IOException("Intentional Unit Test Exception");
+ }
+ };
+
+ try {
+ lease.publish(flowFile, failureInputStream, messageKey, demarcatorBytes, topic);
+ Assert.fail("Expected IOException");
+ } catch (final IOException ioe) {
+ // expected
+ }
+
+ assertEquals(1, poisonCount.get());
+
+ final PublishResult result = lease.complete();
+ assertTrue(result.getFailedFlowFiles().contains(flowFile));
+ assertFalse(result.getSuccessfulFlowFiles().contains(flowFile));
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testPoisonOnFailure() throws IOException {
+ final AtomicInteger poisonCount = new AtomicInteger(0);
+
+ final PublisherLease lease = new PublisherLease(producer, 1024 * 1024, 1000L, logger) {
+ @Override
+ public void poison() {
+ poisonCount.incrementAndGet();
+ super.poison();
+ }
+ };
+
+ final FlowFile flowFile = new MockFlowFile(1L);
+ final String topic = "unit-test";
+ final byte[] messageKey = null;
+ final byte[] demarcatorBytes = null;
+
+ doAnswer(new Answer<Object>() {
+ @Override
+ public Object answer(final InvocationOnMock invocation) throws Throwable {
+ final Callback callback = invocation.getArgumentAt(1, Callback.class);
+ callback.onCompletion(null, new RuntimeException("Unit Test Intentional Exception"));
+ return null;
+ }
+ }).when(producer).send(any(ProducerRecord.class), any(Callback.class));
+
+ lease.publish(flowFile, new ByteArrayInputStream(new byte[1]), messageKey, demarcatorBytes, topic);
+
+ assertEquals(1, poisonCount.get());
+
+ final PublishResult result = lease.complete();
+ assertTrue(result.getFailedFlowFiles().contains(flowFile));
+ assertFalse(result.getSuccessfulFlowFiles().contains(flowFile));
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testAllDelimitedMessagesSent() throws IOException {
+ final AtomicInteger poisonCount = new AtomicInteger(0);
+
+ final PublisherLease lease = new PublisherLease(producer, 1024 * 1024, 10L, logger) {
+ @Override
+ protected void poison() {
+ poisonCount.incrementAndGet();
+ super.poison();
+ }
+ };
+
+ final AtomicInteger correctMessages = new AtomicInteger(0);
+ final AtomicInteger incorrectMessages = new AtomicInteger(0);
+ doAnswer(new Answer<Object>() {
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable {
+ final ProducerRecord<byte[], byte[]> record = invocation.getArgumentAt(0, ProducerRecord.class);
+ final byte[] value = record.value();
+ final String valueString = new String(value, StandardCharsets.UTF_8);
+ if ("1234567890".equals(valueString)) {
+ correctMessages.incrementAndGet();
+ } else {
+ incorrectMessages.incrementAndGet();
+ }
+
+ return null;
+ }
+ }).when(producer).send(any(ProducerRecord.class), any(Callback.class));
+
+ final FlowFile flowFile = new MockFlowFile(1L);
+ final String topic = "unit-test";
+ final byte[] messageKey = null;
+ final byte[] demarcatorBytes = "\n".getBytes(StandardCharsets.UTF_8);
+
+ final byte[] flowFileContent = "1234567890\n1234567890\n1234567890\n\n\n\n1234567890\n\n\n1234567890\n\n\n\n".getBytes(StandardCharsets.UTF_8);
+ lease.publish(flowFile, new ByteArrayInputStream(flowFileContent), messageKey, demarcatorBytes, topic);
+
+ final byte[] flowFileContent2 = new byte[0];
+ lease.publish(new MockFlowFile(2L), new ByteArrayInputStream(flowFileContent2), messageKey, demarcatorBytes, topic);
+
+ final byte[] flowFileContent3 = "1234567890\n1234567890".getBytes(StandardCharsets.UTF_8); // no trailing new line
+ lease.publish(new MockFlowFile(3L), new ByteArrayInputStream(flowFileContent3), messageKey, demarcatorBytes, topic);
+
+ final byte[] flowFileContent4 = "\n\n\n".getBytes(StandardCharsets.UTF_8);
+ lease.publish(new MockFlowFile(4L), new ByteArrayInputStream(flowFileContent4), messageKey, demarcatorBytes, topic);
+
+ assertEquals(0, poisonCount.get());
+
+ verify(producer, times(0)).flush();
+
+ final PublishResult result = lease.complete();
+ assertTrue(result.getFailedFlowFiles().contains(flowFile));
+ assertFalse(result.getSuccessfulFlowFiles().contains(flowFile));
+
+ assertEquals(7, correctMessages.get());
+ assertEquals(0, incorrectMessages.get());
+
+ verify(producer, times(1)).flush();
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/ed17df50/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherPool.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherPool.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherPool.java
new file mode 100644
index 0000000..7c70194
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherPool.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.kafka.pubsub;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.nifi.logging.ComponentLog;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+
+public class TestPublisherPool {
+
+ @Test
+ public void testLeaseCloseReturnsToPool() {
+ final Map<String, Object> kafkaProperties = new HashMap<>();
+ kafkaProperties.put("bootstrap.servers", "localhost:1111");
+ kafkaProperties.put("key.serializer", ByteArraySerializer.class.getName());
+ kafkaProperties.put("value.serializer", ByteArraySerializer.class.getName());
+
+ final PublisherPool pool = new PublisherPool(kafkaProperties, Mockito.mock(ComponentLog.class), 1024 * 1024, 1000L);
+ assertEquals(0, pool.available());
+
+ final PublisherLease lease = pool.obtainPublisher();
+ assertEquals(0, pool.available());
+
+ lease.close();
+ assertEquals(1, pool.available());
+ }
+
+ @Test
+ public void testPoisonedLeaseNotReturnedToPool() {
+ final Map<String, Object> kafkaProperties = new HashMap<>();
+ kafkaProperties.put("bootstrap.servers", "localhost:1111");
+ kafkaProperties.put("key.serializer", ByteArraySerializer.class.getName());
+ kafkaProperties.put("value.serializer", ByteArraySerializer.class.getName());
+
+ final PublisherPool pool = new PublisherPool(kafkaProperties, Mockito.mock(ComponentLog.class), 1024 * 1024, 1000L);
+ assertEquals(0, pool.available());
+
+ final PublisherLease lease = pool.obtainPublisher();
+ assertEquals(0, pool.available());
+
+ lease.poison();
+ lease.close();
+ assertEquals(0, pool.available());
+ }
+
+}