You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2011/11/05 13:46:14 UTC

svn commit: r1197948 - in /camel/trunk: camel-core/src/main/java/org/apache/camel/component/file/ camel-core/src/main/java/org/apache/camel/impl/ components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/ components/camel-aws/src/main/java/o...

Author: davsclaus
Date: Sat Nov  5 12:46:13 2011
New Revision: 1197948

URL: http://svn.apache.org/viewvc?rev=1197948&view=rev
Log:
CAMEL-4605: Fixed issue when stopping Batch Consumer routes and have configured route with CompleteAllTask, there may be a slim chance the route will be shutdown too early before the last batch message had a chance to be enlisted as in flight exchange.

Added:
    camel/trunk/tests/camel-itest/src/test/java/org/apache/camel/itest/quartz/
    camel/trunk/tests/camel-itest/src/test/java/org/apache/camel/itest/quartz/FtpCronScheduledRoutePolicyTest.java
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
    camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java
    camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java
    camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisConsumer.java
    camel/trunk/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsBlobStoreConsumer.java
    camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java
    camel/trunk/components/camel-krati/src/main/java/org/apache/camel/component/krati/KratiConsumer.java
    camel/trunk/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java
    camel/trunk/components/camel-mybatis/src/main/java/org/apache/camel/component/mybatis/MyBatisConsumer.java
    camel/trunk/tests/camel-itest/pom.xml

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java?rev=1197948&r1=1197947&r2=1197948&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java Sat Nov  5 12:46:13 2011
@@ -195,12 +195,24 @@ public abstract class GenericFileConsume
     }
 
     public int getPendingExchangesSize() {
+        int answer;
         // only return the real pending size in case we are configured to complete all tasks
         if (ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask) {
-            return pendingExchanges;
+            answer = pendingExchanges;
         } else {
-            return 0;
+            answer = 0;
         }
+
+        if (answer == 0 && isPolling()) {
+            // force at least one pending exchange if we are polling as there is a little gap
+            // in the processBatch method and until an exchange gets enlisted as in-flight
+            // which happens later, so we need to signal back to the shutdown strategy that
+            // there is a pending exchange. When we are no longer polling, then we will return 0
+            log.trace("Currently polling so returning 1 as pending exchanges");
+            answer = 1;
+        }
+
+        return answer;
     }
 
     public void prepareShutdown() {

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java?rev=1197948&r1=1197947&r2=1197948&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java Sat Nov  5 12:46:13 2011
@@ -52,6 +52,7 @@ public abstract class ScheduledPollConsu
     private PollingConsumerPollStrategy pollStrategy = new DefaultPollingConsumerPollStrategy();
     private LoggingLevel runLoggingLevel = LoggingLevel.TRACE;
     private boolean sendEmptyMessageWhenIdle;
+    private volatile boolean polling;
 
     public ScheduledPollConsumer(Endpoint endpoint, Processor processor) {
         super(endpoint, processor);
@@ -130,19 +131,25 @@ public abstract class ScheduledPollConsu
                         LOG.debug("Retrying attempt {} to poll: {}", retryCounter, this.getEndpoint());
                     }
 
-                    boolean begin = pollStrategy.begin(this, getEndpoint());
-                    if (begin) {
-                        retryCounter++;
-                        int polledMessages = poll();
-                        
-                        if (polledMessages == 0 && isSendEmptyMessageWhenIdle()) {
-                            // send an "empty" exchange
-                            processEmptyMessage();
+                    // mark we are polling which should also include the begin/poll/commit
+                    polling = true;
+                    try {
+                        boolean begin = pollStrategy.begin(this, getEndpoint());
+                        if (begin) {
+                            retryCounter++;
+                            int polledMessages = poll();
+
+                            if (polledMessages == 0 && isSendEmptyMessageWhenIdle()) {
+                                // send an "empty" exchange
+                                processEmptyMessage();
+                            }
+
+                            pollStrategy.commit(this, getEndpoint(), polledMessages);
+                        } else {
+                            LOG.debug("Cannot begin polling as pollStrategy returned false: {}", pollStrategy);
                         }
-                        
-                        pollStrategy.commit(this, getEndpoint(), polledMessages);
-                    } else {
-                        LOG.debug("Cannot begin polling as pollStrategy returned false: {}", pollStrategy);
+                    } finally {
+                        polling = false;
                     }
                 }
 
@@ -190,6 +197,13 @@ public abstract class ScheduledPollConsu
         return isRunAllowed() && !isSuspended();
     }
 
+    /**
+     * Whether polling is currently in progress
+     */
+    protected boolean isPolling() {
+        return polling;
+    }
+
     public long getInitialDelay() {
         return initialDelay;
     }

Modified: camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java?rev=1197948&r1=1197947&r2=1197948&view=diff
==============================================================================
--- camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java (original)
+++ camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java Sat Nov  5 12:46:13 2011
@@ -190,12 +190,24 @@ public class S3Consumer extends Schedule
     }
 
     public int getPendingExchangesSize() {
+        int answer;
         // only return the real pending size in case we are configured to complete all tasks
         if (ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask) {
-            return pendingExchanges;
+            answer = pendingExchanges;
         } else {
-            return 0;
+            answer = 0;
         }
+
+        if (answer == 0 && isPolling()) {
+            // force at least one pending exchange if we are polling as there is a little gap
+            // in the processBatch method and until an exchange gets enlisted as in-flight
+            // which happens later, so we need to signal back to the shutdown strategy that
+            // there is a pending exchange. When we are no longer polling, then we will return 0
+            log.trace("Currently polling so returning 1 as pending exchanges");
+            answer = 1;
+        }
+
+        return answer;
     }
 
     public void prepareShutdown() {

Modified: camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java?rev=1197948&r1=1197947&r2=1197948&view=diff
==============================================================================
--- camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java (original)
+++ camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java Sat Nov  5 12:46:13 2011
@@ -190,12 +190,24 @@ public class SqsConsumer extends Schedul
     }
 
     public int getPendingExchangesSize() {
+        int answer;
         // only return the real pending size in case we are configured to complete all tasks
         if (ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask) {
-            return pendingExchanges;
+            answer = pendingExchanges;
         } else {
-            return 0;
+            answer = 0;
         }
+
+        if (answer == 0 && isPolling()) {
+            // force at least one pending exchange if we are polling as there is a little gap
+            // in the processBatch method and until an exchange gets enlisted as in-flight
+            // which happens later, so we need to signal back to the shutdown strategy that
+            // there is a pending exchange. When we are no longer polling, then we will return 0
+            log.trace("Currently polling so returning 1 as pending exchanges");
+            answer = 1;
+        }
+
+        return answer;
     }
 
     public void prepareShutdown() {

Modified: camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisConsumer.java?rev=1197948&r1=1197947&r2=1197948&view=diff
==============================================================================
--- camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisConsumer.java (original)
+++ camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisConsumer.java Sat Nov  5 12:46:13 2011
@@ -169,12 +169,24 @@ public class IBatisConsumer extends Sche
     }
 
     public int getPendingExchangesSize() {
+        int answer;
         // only return the real pending size in case we are configured to complete all tasks
         if (ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask) {
-            return pendingExchanges;
+            answer = pendingExchanges;
         } else {
-            return 0;
+            answer = 0;
         }
+
+        if (answer == 0 && isPolling()) {
+            // force at least one pending exchange if we are polling as there is a little gap
+            // in the processBatch method and until an exchange gets enlisted as in-flight
+            // which happens later, so we need to signal back to the shutdown strategy that
+            // there is a pending exchange. When we are no longer polling, then we will return 0
+            log.trace("Currently polling so returning 1 as pending exchanges");
+            answer = 1;
+        }
+
+        return answer;
     }
 
     public void prepareShutdown() {

Modified: camel/trunk/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsBlobStoreConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsBlobStoreConsumer.java?rev=1197948&r1=1197947&r2=1197948&view=diff
==============================================================================
--- camel/trunk/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsBlobStoreConsumer.java (original)
+++ camel/trunk/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsBlobStoreConsumer.java Sat Nov  5 12:46:13 2011
@@ -141,12 +141,24 @@ public class JcloudsBlobStoreConsumer ex
     }
 
     public int getPendingExchangesSize() {
+        int answer;
         // only return the real pending size in case we are configured to complete all tasks
         if (ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask) {
-            return pendingExchanges;
+            answer = pendingExchanges;
         } else {
-            return 0;
+            answer = 0;
         }
+
+        if (answer == 0 && isPolling()) {
+            // force at least one pending exchange if we are polling as there is a little gap
+            // in the processBatch method and until an exchange gets enlisted as in-flight
+            // which happens later, so we need to signal back to the shutdown strategy that
+            // there is a pending exchange. When we are no longer polling, then we will return 0
+            log.trace("Currently polling so returning 1 as pending exchanges");
+            answer = 1;
+        }
+
+        return answer;
     }
 
     @Override

Modified: camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java?rev=1197948&r1=1197947&r2=1197948&view=diff
==============================================================================
--- camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java (original)
+++ camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java Sat Nov  5 12:46:13 2011
@@ -158,12 +158,24 @@ public class JpaConsumer extends Schedul
     }
 
     public int getPendingExchangesSize() {
+        int answer;
         // only return the real pending size in case we are configured to complete all tasks
         if (ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask) {
-            return pendingExchanges;
+            answer = pendingExchanges;
         } else {
-            return 0;
+            answer = 0;
         }
+
+        if (answer == 0 && isPolling()) {
+            // force at least one pending exchange if we are polling as there is a little gap
+            // in the processBatch method and until an exchange gets enlisted as in-flight
+            // which happens later, so we need to signal back to the shutdown strategy that
+            // there is a pending exchange. When we are no longer polling, then we will return 0
+            log.trace("Currently polling so returning 1 as pending exchanges");
+            answer = 1;
+        }
+
+        return answer;
     }
 
     public void prepareShutdown() {

Modified: camel/trunk/components/camel-krati/src/main/java/org/apache/camel/component/krati/KratiConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-krati/src/main/java/org/apache/camel/component/krati/KratiConsumer.java?rev=1197948&r1=1197947&r2=1197948&view=diff
==============================================================================
--- camel/trunk/components/camel-krati/src/main/java/org/apache/camel/component/krati/KratiConsumer.java (original)
+++ camel/trunk/components/camel-krati/src/main/java/org/apache/camel/component/krati/KratiConsumer.java Sat Nov  5 12:46:13 2011
@@ -151,12 +151,24 @@ public class KratiConsumer extends Sched
 
     @Override
     public int getPendingExchangesSize() {
+        int answer;
         // only return the real pending size in case we are configured to complete all tasks
         if (ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask) {
-            return pendingExchanges;
+            answer = pendingExchanges;
         } else {
-            return 0;
+            answer = 0;
         }
+
+        if (answer == 0 && isPolling()) {
+            // force at least one pending exchange if we are polling as there is a little gap
+            // in the processBatch method and until an exchange gets enlisted as in-flight
+            // which happens later, so we need to signal back to the shutdown strategy that
+            // there is a pending exchange. When we are no longer polling, then we will return 0
+            log.trace("Currently polling so returning 1 as pending exchanges");
+            answer = 1;
+        }
+
+        return answer;
     }
 
     @Override

Modified: camel/trunk/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java?rev=1197948&r1=1197947&r2=1197948&view=diff
==============================================================================
--- camel/trunk/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java (original)
+++ camel/trunk/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java Sat Nov  5 12:46:13 2011
@@ -211,12 +211,24 @@ public class MailConsumer extends Schedu
     }
 
     public int getPendingExchangesSize() {
+        int answer;
         // only return the real pending size in case we are configured to complete all tasks
         if (ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask) {
-            return pendingExchanges;
+            answer = pendingExchanges;
         } else {
-            return 0;
+            answer = 0;
         }
+
+        if (answer == 0 && isPolling()) {
+            // force at least one pending exchange if we are polling as there is a little gap
+            // in the processBatch method and until an exchange gets enlisted as in-flight
+            // which happens later, so we need to signal back to the shutdown strategy that
+            // there is a pending exchange. When we are no longer polling, then we will return 0
+            log.trace("Currently polling so returning 1 as pending exchanges");
+            answer = 1;
+        }
+
+        return answer;
     }
 
     public void prepareShutdown() {

Modified: camel/trunk/components/camel-mybatis/src/main/java/org/apache/camel/component/mybatis/MyBatisConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mybatis/src/main/java/org/apache/camel/component/mybatis/MyBatisConsumer.java?rev=1197948&r1=1197947&r2=1197948&view=diff
==============================================================================
--- camel/trunk/components/camel-mybatis/src/main/java/org/apache/camel/component/mybatis/MyBatisConsumer.java (original)
+++ camel/trunk/components/camel-mybatis/src/main/java/org/apache/camel/component/mybatis/MyBatisConsumer.java Sat Nov  5 12:46:13 2011
@@ -169,12 +169,24 @@ public class MyBatisConsumer extends Sch
     }
 
     public int getPendingExchangesSize() {
+        int answer;
         // only return the real pending size in case we are configured to complete all tasks
         if (ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask) {
-            return pendingExchanges;
+            answer = pendingExchanges;
         } else {
-            return 0;
+            answer = 0;
         }
+
+        if (answer == 0 && isPolling()) {
+            // force at least one pending exchange if we are polling as there is a little gap
+            // in the processBatch method and until an exchange gets enlisted as in-flight
+            // which happens later, so we need to signal back to the shutdown strategy that
+            // there is a pending exchange. When we are no longer polling, then we will return 0
+            log.trace("Currently polling so returning 1 as pending exchanges");
+            answer = 1;
+        }
+
+        return answer;
     }
 
     public void prepareShutdown() {

Modified: camel/trunk/tests/camel-itest/pom.xml
URL: http://svn.apache.org/viewvc/camel/trunk/tests/camel-itest/pom.xml?rev=1197948&r1=1197947&r2=1197948&view=diff
==============================================================================
--- camel/trunk/tests/camel-itest/pom.xml (original)
+++ camel/trunk/tests/camel-itest/pom.xml Sat Nov  5 12:46:13 2011
@@ -100,6 +100,11 @@
         </dependency>
         <dependency>
             <groupId>org.apache.camel</groupId>
+            <artifactId>camel-quartz</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
             <artifactId>camel-rss</artifactId>
             <scope>test</scope>
             <!-- conflicts with mockmail for unit testing, so we exclude this geronimo spec -->

Added: camel/trunk/tests/camel-itest/src/test/java/org/apache/camel/itest/quartz/FtpCronScheduledRoutePolicyTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/tests/camel-itest/src/test/java/org/apache/camel/itest/quartz/FtpCronScheduledRoutePolicyTest.java?rev=1197948&view=auto
==============================================================================
--- camel/trunk/tests/camel-itest/src/test/java/org/apache/camel/itest/quartz/FtpCronScheduledRoutePolicyTest.java (added)
+++ camel/trunk/tests/camel-itest/src/test/java/org/apache/camel/itest/quartz/FtpCronScheduledRoutePolicyTest.java Sat Nov  5 12:46:13 2011
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.itest.quartz;
+
+import java.io.File;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.ShutdownRunningTask;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.builder.ThreadPoolProfileBuilder;
+import org.apache.camel.routepolicy.quartz.CronScheduledRoutePolicy;
+import org.apache.camel.spi.ThreadPoolProfile;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.apache.ftpserver.FtpServer;
+import org.apache.ftpserver.FtpServerFactory;
+import org.apache.ftpserver.filesystem.nativefs.NativeFileSystemFactory;
+import org.apache.ftpserver.ftplet.UserManager;
+import org.apache.ftpserver.listener.ListenerFactory;
+import org.apache.ftpserver.usermanager.ClearTextPasswordEncryptor;
+import org.apache.ftpserver.usermanager.impl.PropertiesUserManager;
+import org.junit.Ignore;
+import org.junit.Test;
+
+/**
+ *
+ */
+@Ignore("Manual test")
+public class FtpCronScheduledRoutePolicyTest extends CamelTestSupport {
+
+    protected FtpServer ftpServer;
+    private String ftp = "ftp:localhost:20128/myapp?password=admin&username=admin&delay=5s&idempotent=false&localWorkDirectory=target/tmp";
+
+    @Test
+    public void testFtpCronScheduledRoutePolicyTest() throws Exception {
+        template.sendBodyAndHeader("file:res/home/myapp", "Hello World", Exchange.FILE_NAME, "hello.txt");
+
+        Thread.sleep(10 * 1000 * 60);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                CronScheduledRoutePolicy policy = new CronScheduledRoutePolicy();
+                policy.setRouteStartTime("* 0/2 * * * ?");
+                policy.setRouteStopTime("* 1/2 * * * ?");
+                policy.setRouteStopGracePeriod(250);
+                policy.setTimeUnit(TimeUnit.SECONDS);
+
+                ThreadPoolProfile profile = new ThreadPoolProfileBuilder("foo")
+                    .poolSize(2).maxPoolSize(2).maxPoolSize(-1).build();
+
+                context.getExecutorServiceManager().registerThreadPoolProfile(profile);
+
+                from(ftp)
+                    .noAutoStartup().routePolicy(policy).shutdownRunningTask(ShutdownRunningTask.CompleteAllTasks)
+                    .log("Processing ${file:name}")
+                    .to("log:done");
+            }
+        };
+    }
+
+    public void setUp() throws Exception {
+        super.setUp();
+        deleteDirectory("res");
+        createDirectory("res/home/myapp");
+        initFtpServer();
+        ftpServer.start();
+    }
+
+    public void tearDown() throws Exception {
+        super.tearDown();
+        ftpServer.stop();
+        ftpServer = null;
+    }
+
+    protected void initFtpServer() throws Exception {
+        FtpServerFactory serverFactory = new FtpServerFactory();
+
+        // setup user management to read our users.properties and use clear text passwords
+        File file = new File("./src/test/resources/users.properties").getAbsoluteFile();
+        UserManager uman = new PropertiesUserManager(new ClearTextPasswordEncryptor(), file, "admin");
+        serverFactory.setUserManager(uman);
+
+        NativeFileSystemFactory fsf = new NativeFileSystemFactory();
+        fsf.setCreateHome(true);
+        serverFactory.setFileSystem(fsf);
+
+        ListenerFactory factory = new ListenerFactory();
+        factory.setPort(20128);
+        serverFactory.addListener("default", factory.createListener());
+
+        ftpServer = serverFactory.createServer();
+    }
+
+}