You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jp...@apache.org on 2016/12/15 18:51:45 UTC

[06/20] nifi git commit: NIFI-3198: Refactored how PublishKafka and PublishKafka_0_10 work to improve throughput and resilience. Fixed bug in StreamDemarcator. Slight refactoring of consume processors to simplify code.

http://git-wip-us.apache.org/repos/asf/nifi/blob/ed17df50/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafka.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafka.java
new file mode 100644
index 0000000..f9f2485
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafka.java
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.kafka.pubsub;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestPublishKafka {
+    private static final String TOPIC_NAME = "unit-test";
+
+    private PublisherPool mockPool;
+    private PublisherLease mockLease;
+    private TestRunner runner;
+
+    @Before
+    public void setup() {
+        mockPool = mock(PublisherPool.class);
+        mockLease = mock(PublisherLease.class);
+
+        when(mockPool.obtainPublisher()).thenReturn(mockLease);
+
+        runner = TestRunners.newTestRunner(new PublishKafka() {
+            @Override
+            protected PublisherPool createPublisherPool(final ProcessContext context) {
+                return mockPool;
+            }
+        });
+
+        runner.setProperty(PublishKafka.TOPIC, TOPIC_NAME);
+    }
+
+    @Test
+    public void testSingleSuccess() throws IOException {
+        final MockFlowFile flowFile = runner.enqueue("hello world");
+
+        when(mockLease.complete()).thenReturn(createAllSuccessPublishResult(flowFile, 1));
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(PublishKafka.REL_SUCCESS, 1);
+
+        verify(mockLease, times(1)).publish(any(FlowFile.class), any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME));
+        verify(mockLease, times(1)).complete();
+        verify(mockLease, times(0)).poison();
+        verify(mockLease, times(1)).close();
+    }
+
+    @Test
+    public void testMultipleSuccess() throws IOException {
+        final Set<FlowFile> flowFiles = new HashSet<>();
+        flowFiles.add(runner.enqueue("hello world"));
+        flowFiles.add(runner.enqueue("hello world"));
+        flowFiles.add(runner.enqueue("hello world"));
+
+
+        when(mockLease.complete()).thenReturn(createAllSuccessPublishResult(flowFiles, 1));
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(PublishKafka.REL_SUCCESS, 3);
+
+        verify(mockLease, times(3)).publish(any(FlowFile.class), any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME));
+        verify(mockLease, times(1)).complete();
+        verify(mockLease, times(0)).poison();
+        verify(mockLease, times(1)).close();
+    }
+
+    @Test
+    public void testSingleFailure() throws IOException {
+        final MockFlowFile flowFile = runner.enqueue("hello world");
+
+        when(mockLease.complete()).thenReturn(createFailurePublishResult(flowFile));
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(PublishKafka.REL_FAILURE, 1);
+
+        verify(mockLease, times(1)).publish(any(FlowFile.class), any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME));
+        verify(mockLease, times(1)).complete();
+        verify(mockLease, times(1)).close();
+    }
+
+    @Test
+    public void testMultipleFailures() throws IOException {
+        final Set<FlowFile> flowFiles = new HashSet<>();
+        flowFiles.add(runner.enqueue("hello world"));
+        flowFiles.add(runner.enqueue("hello world"));
+        flowFiles.add(runner.enqueue("hello world"));
+
+        when(mockLease.complete()).thenReturn(createFailurePublishResult(flowFiles));
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(PublishKafka.REL_FAILURE, 3);
+
+        verify(mockLease, times(3)).publish(any(FlowFile.class), any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME));
+        verify(mockLease, times(1)).complete();
+        verify(mockLease, times(1)).close();
+    }
+
+    @Test
+    public void testMultipleMessagesPerFlowFile() throws IOException {
+        final List<FlowFile> flowFiles = new ArrayList<>();
+        flowFiles.add(runner.enqueue("hello world"));
+        flowFiles.add(runner.enqueue("hello world"));
+
+        final Map<FlowFile, Integer> msgCounts = new HashMap<>();
+        msgCounts.put(flowFiles.get(0), 10);
+        msgCounts.put(flowFiles.get(1), 20);
+
+        final PublishResult result = createPublishResult(msgCounts, new HashSet<>(flowFiles), Collections.emptyMap());
+
+        when(mockLease.complete()).thenReturn(result);
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(PublishKafka.REL_SUCCESS, 2);
+
+        verify(mockLease, times(2)).publish(any(FlowFile.class), any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME));
+        verify(mockLease, times(1)).complete();
+        verify(mockLease, times(0)).poison();
+        verify(mockLease, times(1)).close();
+
+        runner.assertAllFlowFilesContainAttribute("msg.count");
+        assertEquals(1, runner.getFlowFilesForRelationship(PublishKafka.REL_SUCCESS).stream()
+            .filter(ff -> ff.getAttribute("msg.count").equals("10"))
+            .count());
+        assertEquals(1, runner.getFlowFilesForRelationship(PublishKafka.REL_SUCCESS).stream()
+            .filter(ff -> ff.getAttribute("msg.count").equals("20"))
+            .count());
+    }
+
+
+    @Test
+    public void testSomeSuccessSomeFailure() throws IOException {
+        final List<FlowFile> flowFiles = new ArrayList<>();
+        flowFiles.add(runner.enqueue("hello world"));
+        flowFiles.add(runner.enqueue("hello world"));
+        flowFiles.add(runner.enqueue("hello world"));
+        flowFiles.add(runner.enqueue("hello world"));
+
+        final Map<FlowFile, Integer> msgCounts = new HashMap<>();
+        msgCounts.put(flowFiles.get(0), 10);
+        msgCounts.put(flowFiles.get(1), 20);
+
+        final Map<FlowFile, Exception> failureMap = new HashMap<>();
+        failureMap.put(flowFiles.get(2), new RuntimeException("Intentional Unit Test Exception"));
+        failureMap.put(flowFiles.get(3), new RuntimeException("Intentional Unit Test Exception"));
+
+        final PublishResult result = createPublishResult(msgCounts, new HashSet<>(flowFiles.subList(0, 2)), failureMap);
+
+        when(mockLease.complete()).thenReturn(result);
+
+        runner.run();
+        runner.assertTransferCount(PublishKafka.REL_SUCCESS, 2);
+        runner.assertTransferCount(PublishKafka.REL_FAILURE, 2);
+
+        verify(mockLease, times(4)).publish(any(FlowFile.class), any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME));
+        verify(mockLease, times(1)).complete();
+        verify(mockLease, times(1)).close();
+
+        assertEquals(1, runner.getFlowFilesForRelationship(PublishKafka.REL_SUCCESS).stream()
+            .filter(ff -> "10".equals(ff.getAttribute("msg.count")))
+            .count());
+        assertEquals(1, runner.getFlowFilesForRelationship(PublishKafka.REL_SUCCESS).stream()
+            .filter(ff -> "20".equals(ff.getAttribute("msg.count")))
+            .count());
+
+        assertTrue(runner.getFlowFilesForRelationship(PublishKafka.REL_FAILURE).stream()
+            .noneMatch(ff -> ff.getAttribute("msg.count") != null));
+    }
+
+
+    private PublishResult createAllSuccessPublishResult(final FlowFile successfulFlowFile, final int msgCount) {
+        return createAllSuccessPublishResult(Collections.singleton(successfulFlowFile), msgCount);
+    }
+
+    private PublishResult createAllSuccessPublishResult(final Set<FlowFile> successfulFlowFiles, final int msgCountPerFlowFile) {
+        final Map<FlowFile, Integer> msgCounts = new HashMap<>();
+        for (final FlowFile ff : successfulFlowFiles) {
+            msgCounts.put(ff, msgCountPerFlowFile);
+        }
+        return createPublishResult(msgCounts, successfulFlowFiles, Collections.emptyMap());
+    }
+
+    private PublishResult createFailurePublishResult(final FlowFile failure) {
+        return createFailurePublishResult(Collections.singleton(failure));
+    }
+
+    private PublishResult createFailurePublishResult(final Set<FlowFile> failures) {
+        final Map<FlowFile, Exception> failureMap = failures.stream().collect(Collectors.toMap(ff -> ff, ff -> new RuntimeException("Intentional Unit Test Exception")));
+        return createPublishResult(Collections.emptyMap(), Collections.emptySet(), failureMap);
+    }
+
+    private PublishResult createPublishResult(final Map<FlowFile, Integer> msgCounts, final Set<FlowFile> successFlowFiles, final Map<FlowFile, Exception> failures) {
+        // sanity check.
+        for (final FlowFile success : successFlowFiles) {
+            if (failures.containsKey(success)) {
+                throw new IllegalArgumentException("Found same FlowFile in both 'success' and 'failures' collections: " + success);
+            }
+        }
+
+        return new PublishResult() {
+            @Override
+            public Collection<FlowFile> getSuccessfulFlowFiles() {
+                return successFlowFiles;
+            }
+
+            @Override
+            public Collection<FlowFile> getFailedFlowFiles() {
+                return failures.keySet();
+            }
+
+            @Override
+            public int getSuccessfulMessageCount(FlowFile flowFile) {
+                Integer count = msgCounts.get(flowFile);
+                return count == null ? 0 : count.intValue();
+            }
+
+            @Override
+            public Exception getReasonForFailure(FlowFile flowFile) {
+                return failures.get(flowFile);
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/ed17df50/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java
new file mode 100644
index 0000000..c2d143c
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.kafka.pubsub;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.util.MockFlowFile;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+
+public class TestPublisherLease {
+    private ComponentLog logger;
+    private Producer<byte[], byte[]> producer;
+
+    @Before
+    @SuppressWarnings("unchecked")
+    public void setup() {
+        logger = Mockito.mock(ComponentLog.class);
+        producer = Mockito.mock(Producer.class);
+    }
+
+    @Test
+    public void testPoisonOnException() throws IOException {
+        final AtomicInteger poisonCount = new AtomicInteger(0);
+
+        final PublisherLease lease = new PublisherLease(producer, 1024 * 1024, 1000L, logger) {
+            @Override
+            public void poison() {
+                poisonCount.incrementAndGet();
+                super.poison();
+            }
+        };
+
+        final FlowFile flowFile = new MockFlowFile(1L);
+        final String topic = "unit-test";
+        final byte[] messageKey = null;
+        final byte[] demarcatorBytes = null;
+
+        final InputStream failureInputStream = new InputStream() {
+            @Override
+            public int read() throws IOException {
+                throw new IOException("Intentional Unit Test Exception");
+            }
+        };
+
+        try {
+            lease.publish(flowFile, failureInputStream, messageKey, demarcatorBytes, topic);
+            Assert.fail("Expected IOException");
+        } catch (final IOException ioe) {
+            // expected
+        }
+
+        assertEquals(1, poisonCount.get());
+
+        final PublishResult result = lease.complete();
+        assertTrue(result.getFailedFlowFiles().contains(flowFile));
+        assertFalse(result.getSuccessfulFlowFiles().contains(flowFile));
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testPoisonOnFailure() throws IOException {
+        final AtomicInteger poisonCount = new AtomicInteger(0);
+
+        final PublisherLease lease = new PublisherLease(producer, 1024 * 1024, 1000L, logger) {
+            @Override
+            public void poison() {
+                poisonCount.incrementAndGet();
+                super.poison();
+            }
+        };
+
+        final FlowFile flowFile = new MockFlowFile(1L);
+        final String topic = "unit-test";
+        final byte[] messageKey = null;
+        final byte[] demarcatorBytes = null;
+
+        doAnswer(new Answer<Object>() {
+            @Override
+            public Object answer(final InvocationOnMock invocation) throws Throwable {
+                final Callback callback = invocation.getArgumentAt(1, Callback.class);
+                callback.onCompletion(null, new RuntimeException("Unit Test Intentional Exception"));
+                return null;
+            }
+        }).when(producer).send(any(ProducerRecord.class), any(Callback.class));
+
+        lease.publish(flowFile, new ByteArrayInputStream(new byte[1]), messageKey, demarcatorBytes, topic);
+
+        assertEquals(1, poisonCount.get());
+
+        final PublishResult result = lease.complete();
+        assertTrue(result.getFailedFlowFiles().contains(flowFile));
+        assertFalse(result.getSuccessfulFlowFiles().contains(flowFile));
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testAllDelimitedMessagesSent() throws IOException {
+        final AtomicInteger poisonCount = new AtomicInteger(0);
+
+        final PublisherLease lease = new PublisherLease(producer, 1024 * 1024, 10L, logger) {
+            @Override
+            protected void poison() {
+                poisonCount.incrementAndGet();
+                super.poison();
+            }
+        };
+
+        final AtomicInteger correctMessages = new AtomicInteger(0);
+        final AtomicInteger incorrectMessages = new AtomicInteger(0);
+        doAnswer(new Answer<Object>() {
+            @Override
+            public Object answer(InvocationOnMock invocation) throws Throwable {
+                final ProducerRecord<byte[], byte[]> record = invocation.getArgumentAt(0, ProducerRecord.class);
+                final byte[] value = record.value();
+                final String valueString = new String(value, StandardCharsets.UTF_8);
+                if ("1234567890".equals(valueString)) {
+                    correctMessages.incrementAndGet();
+                } else {
+                    incorrectMessages.incrementAndGet();
+                }
+
+                return null;
+            }
+        }).when(producer).send(any(ProducerRecord.class), any(Callback.class));
+
+        final FlowFile flowFile = new MockFlowFile(1L);
+        final String topic = "unit-test";
+        final byte[] messageKey = null;
+        final byte[] demarcatorBytes = "\n".getBytes(StandardCharsets.UTF_8);
+
+        final byte[] flowFileContent = "1234567890\n1234567890\n1234567890\n\n\n\n1234567890\n\n\n1234567890\n\n\n\n".getBytes(StandardCharsets.UTF_8);
+        lease.publish(flowFile, new ByteArrayInputStream(flowFileContent), messageKey, demarcatorBytes, topic);
+
+        final byte[] flowFileContent2 = new byte[0];
+        lease.publish(new MockFlowFile(2L), new ByteArrayInputStream(flowFileContent2), messageKey, demarcatorBytes, topic);
+
+        final byte[] flowFileContent3 = "1234567890\n1234567890".getBytes(StandardCharsets.UTF_8); // no trailing new line
+        lease.publish(new MockFlowFile(3L), new ByteArrayInputStream(flowFileContent3), messageKey, demarcatorBytes, topic);
+
+        final byte[] flowFileContent4 = "\n\n\n".getBytes(StandardCharsets.UTF_8);
+        lease.publish(new MockFlowFile(4L), new ByteArrayInputStream(flowFileContent4), messageKey, demarcatorBytes, topic);
+
+        assertEquals(0, poisonCount.get());
+
+        verify(producer, times(0)).flush();
+
+        final PublishResult result = lease.complete();
+        assertTrue(result.getFailedFlowFiles().contains(flowFile));
+        assertFalse(result.getSuccessfulFlowFiles().contains(flowFile));
+
+        assertEquals(7, correctMessages.get());
+        assertEquals(0, incorrectMessages.get());
+
+        verify(producer, times(1)).flush();
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/ed17df50/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherPool.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherPool.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherPool.java
new file mode 100644
index 0000000..7c70194
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherPool.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.kafka.pubsub;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.nifi.logging.ComponentLog;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+
+public class TestPublisherPool {
+
+    @Test
+    public void testLeaseCloseReturnsToPool() {
+        final Map<String, Object> kafkaProperties = new HashMap<>();
+        kafkaProperties.put("bootstrap.servers", "localhost:1111");
+        kafkaProperties.put("key.serializer", ByteArraySerializer.class.getName());
+        kafkaProperties.put("value.serializer", ByteArraySerializer.class.getName());
+
+        final PublisherPool pool = new PublisherPool(kafkaProperties, Mockito.mock(ComponentLog.class), 1024 * 1024, 1000L);
+        assertEquals(0, pool.available());
+
+        final PublisherLease lease = pool.obtainPublisher();
+        assertEquals(0, pool.available());
+
+        lease.close();
+        assertEquals(1, pool.available());
+    }
+
+    @Test
+    public void testPoisonedLeaseNotReturnedToPool() {
+        final Map<String, Object> kafkaProperties = new HashMap<>();
+        kafkaProperties.put("bootstrap.servers", "localhost:1111");
+        kafkaProperties.put("key.serializer", ByteArraySerializer.class.getName());
+        kafkaProperties.put("value.serializer", ByteArraySerializer.class.getName());
+
+        final PublisherPool pool = new PublisherPool(kafkaProperties, Mockito.mock(ComponentLog.class), 1024 * 1024, 1000L);
+        assertEquals(0, pool.available());
+
+        final PublisherLease lease = pool.obtainPublisher();
+        assertEquals(0, pool.available());
+
+        lease.poison();
+        lease.close();
+        assertEquals(0, pool.available());
+    }
+
+}