You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tomee.apache.org by an...@apache.org on 2012/01/17 16:16:38 UTC

svn commit: r1232438 - /openejb/trunk/openejb/server/openejb-ejbd/src/main/java/org/apache/openejb/server/ejbd/KeepAliveServer.java

Author: andygumbrecht
Date: Tue Jan 17 15:16:38 2012
New Revision: 1232438

URL: http://svn.apache.org/viewvc?rev=1232438&view=rev
Log:
Timer should not be started unless service is started.
Reduce log level.
Cleanup.

Modified:
    openejb/trunk/openejb/server/openejb-ejbd/src/main/java/org/apache/openejb/server/ejbd/KeepAliveServer.java

Modified: openejb/trunk/openejb/server/openejb-ejbd/src/main/java/org/apache/openejb/server/ejbd/KeepAliveServer.java
URL: http://svn.apache.org/viewvc/openejb/trunk/openejb/server/openejb-ejbd/src/main/java/org/apache/openejb/server/ejbd/KeepAliveServer.java?rev=1232438&r1=1232437&r2=1232438&view=diff
==============================================================================
--- openejb/trunk/openejb/server/openejb-ejbd/src/main/java/org/apache/openejb/server/ejbd/KeepAliveServer.java (original)
+++ openejb/trunk/openejb/server/openejb-ejbd/src/main/java/org/apache/openejb/server/ejbd/KeepAliveServer.java Tue Jan 17 15:16:38 2012
@@ -16,14 +16,13 @@
  */
 package org.apache.openejb.server.ejbd;
 
+import org.apache.openejb.client.KeepAliveStyle;
 import org.apache.openejb.loader.SystemInstance;
 import org.apache.openejb.server.ServerService;
 import org.apache.openejb.server.ServiceException;
 import org.apache.openejb.server.ServicePool;
 import org.apache.openejb.util.LogCategory;
 import org.apache.openejb.util.Logger;
-import org.apache.openejb.util.Exceptions;
-import org.apache.openejb.client.KeepAliveStyle;
 
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
@@ -33,8 +32,6 @@ import java.io.InterruptedIOException;
 import java.io.OutputStream;
 import java.net.Socket;
 import java.net.SocketException;
-import java.text.SimpleDateFormat;
-import java.util.Date;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Timer;
@@ -55,7 +52,7 @@ public class KeepAliveServer implements 
     private final ServerService service;
     private final long timeout = (1000 * 3);
 
-    private final AtomicBoolean stop = new AtomicBoolean();
+    private final AtomicBoolean running = new AtomicBoolean(false);
     private final KeepAliveTimer keepAliveTimer;
     private Timer timer;
 
@@ -63,16 +60,11 @@ public class KeepAliveServer implements 
         this(new EjbServer());
     }
 
-    public KeepAliveServer(ServerService service) {
+    public KeepAliveServer(final ServerService service) {
         this.service = service;
-
-        keepAliveTimer = new KeepAliveTimer();
-
-        timer = new Timer("KeepAliveTimer", true);
-        timer.scheduleAtFixedRate(keepAliveTimer, timeout, timeout / 2);
+        this.keepAliveTimer = new KeepAliveTimer();
     }
 
-
     public class KeepAliveTimer extends TimerTask {
 
         // Doesn't need to be a map.  Could be a set if Session.equals/hashCode only referenced the Thread.
@@ -80,22 +72,23 @@ public class KeepAliveServer implements 
 
         private BlockingQueue<Runnable> queue;
 
+        @Override
         public void run() {
-            if (!stop.get()) {
+            if (running.get()) {
                 closeInactiveSessions();
             }
         }
 
         private void closeInactiveSessions() {
-            BlockingQueue<Runnable> queue = getQueue();
+            final BlockingQueue<Runnable> queue = getQueue();
             if (queue == null) return;
 
             int backlog = queue.size();
             if (backlog <= 0) return;
 
-            long now = System.currentTimeMillis();
+            final long now = System.currentTimeMillis();
 
-            for (Session session : sessions.values()) {
+            for (final Session session : sessions.values()) {
 
                 if (session.usage.tryLock()) {
                     try {
@@ -104,7 +97,11 @@ public class KeepAliveServer implements 
                                 backlog--;
                                 session.socket.close();
                             } catch (IOException e) {
-                                logger.info("Error closing socket.", e);
+                                if (logger.isWarningEnabled()) {
+                                    logger.warning("closeInactiveSessions: Error closing socket. Debug for StackTrace");
+                                } else if (logger.isDebugEnabled()) {
+                                    logger.debug("closeInactiveSessions: Error closing socket.", e);
+                                }
                             } finally {
                                 removeSession(session);
                             }
@@ -121,17 +118,21 @@ public class KeepAliveServer implements 
         public void closeSessions() {
 
             // Close the ones we can
-            for (Session session : sessions.values()) {
+            for (final Session session : sessions.values()) {
                 if (session.usage.tryLock()) {
                     try {
                         session.socket.close();
                     } catch (IOException e) {
-                        logger.info("Error closing socket.", e);
+                        if (logger.isWarningEnabled()) {
+                            logger.warning("closeSessions: Error closing socket. Debug for StackTrace");
+                        } else if (logger.isDebugEnabled()) {
+                            logger.debug("closeSessions: Error closing socket.", e);
+                        }
                     } finally {
                         removeSession(session);
                         session.usage.unlock();
                     }
-                } else {
+                } else if (logger.isDebugEnabled()) {
                     logger.debug("Allowing graceful shutdown of " + session.socket.getInetAddress());
                 }
             }
@@ -140,19 +141,19 @@ public class KeepAliveServer implements 
         private BlockingQueue<Runnable> getQueue() {
             if (queue == null) {
                 // this can be null if timer fires before service is fully initialized
-                ServicePool incoming = SystemInstance.get().getComponent(ServicePool.class);
+                final ServicePool incoming = SystemInstance.get().getComponent(ServicePool.class);
                 if (incoming == null) return null;
-                ThreadPoolExecutor threadPool = incoming.getThreadPool();
+                final ThreadPoolExecutor threadPool = incoming.getThreadPool();
                 queue = threadPool.getQueue();
             }
             return queue;
         }
 
-        public Session addSession(Session session) {
+        public Session addSession(final Session session) {
             return sessions.put(session.thread, session);
         }
 
-        public Session removeSession(Session session) {
+        public Session removeSession(final Session session) {
             return sessions.remove(session.thread);
         }
     }
@@ -168,45 +169,47 @@ public class KeepAliveServer implements 
         // only used inside the Lock
         private final Socket socket;
 
-        public Session(Socket socket) {
+        public Session(final Socket socket) {
             this.socket = socket;
             this.lastRequest = System.currentTimeMillis();
             this.thread = Thread.currentThread();
         }
 
-        public void service(Socket socket) throws ServiceException, IOException {
+        public void service(final Socket socket) throws ServiceException, IOException {
             keepAliveTimer.addSession(this);
 
             int i = -1;
 
             try {
-                InputStream in = new BufferedInputStream(socket.getInputStream());
-                OutputStream out = new BufferedOutputStream(socket.getOutputStream());
+                final InputStream in = new BufferedInputStream(socket.getInputStream());
+                final OutputStream out = new BufferedOutputStream(socket.getOutputStream());
 
-                while (!stop.get()) {
+                while (running.get()) {
                     try {
                         i = in.read();
                     } catch (SocketException e) {
                         // Socket closed.
                         break;
                     }
-                    if (i == -1){
+                    if (i == -1) {
                         // client hung up
                         break;
                     }
-                    KeepAliveStyle style = KeepAliveStyle.values()[i];
+                    final KeepAliveStyle style = KeepAliveStyle.values()[i];
 
                     try {
                         usage.lock();
 
-                        switch(style){
+                        switch (style) {
                             case PING_PING: {
                                 in.read();
+                                break;
                             }
-                            break;
+
                             case PING_PONG: {
                                 out.write(style.ordinal());
                                 out.flush();
+                                break;
                             }
                         }
 
@@ -217,7 +220,7 @@ public class KeepAliveServer implements 
                         usage.unlock();
                     }
                 }
-            } catch (ArrayIndexOutOfBoundsException e){
+            } catch (ArrayIndexOutOfBoundsException e) {
                 throw new IOException("Unexpected byte " + i);
             } catch (InterruptedIOException e) {
                 Thread.interrupted();
@@ -228,58 +231,73 @@ public class KeepAliveServer implements 
     }
 
 
-    public void service(Socket socket) throws ServiceException, IOException {
-        Session session = new Session(socket);
+    @Override
+    public void service(final Socket socket) throws ServiceException, IOException {
+        final Session session = new Session(socket);
         session.service(socket);
     }
 
-    public void service(InputStream in, OutputStream out) throws ServiceException, IOException {
+    @Override
+    public void service(final InputStream in, final OutputStream out) throws ServiceException, IOException {
     }
 
+    @Override
     public String getIP() {
         return service.getIP();
     }
 
+    @Override
     public String getName() {
         return service.getName();
     }
 
+    @Override
     public int getPort() {
         return service.getPort();
     }
 
+    @Override
     public void start() throws ServiceException {
-        stop.set(false);
-
-//        service.start();
+        if (!this.running.getAndSet(true)) {
+            this.timer = new Timer("KeepAliveTimer", true);
+            this.timer.scheduleAtFixedRate(this.keepAliveTimer, this.timeout, (this.timeout / 2));
+        }
     }
 
-
+    @Override
     public void stop() throws ServiceException {
-        stop.set(true);
-        keepAliveTimer.closeSessions();
-//        service.stop();
+        if (this.running.getAndSet(false)) {
+            try {
+                this.keepAliveTimer.closeSessions();
+            } catch (Throwable e) {
+                //Ignore
+            }
+            this.timer.cancel();
+        }
     }
 
-    public void init(Properties props) throws Exception {
+    @Override
+    public void init(final Properties props) throws Exception {
         service.init(props);
     }
 
     public class Input extends java.io.FilterInputStream {
 
-        public Input(InputStream in) {
+        public Input(final InputStream in) {
             super(in);
         }
 
+        @Override
         public void close() throws IOException {
         }
     }
 
     public class Output extends java.io.FilterOutputStream {
-        public Output(OutputStream out) {
+        public Output(final OutputStream out) {
             super(out);
         }
 
+        @Override
         public void close() throws IOException {
             flush();
         }