You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tomee.apache.org by an...@apache.org on 2012/01/17 16:16:38 UTC
svn commit: r1232438 -
/openejb/trunk/openejb/server/openejb-ejbd/src/main/java/org/apache/openejb/server/ejbd/KeepAliveServer.java
Author: andygumbrecht
Date: Tue Jan 17 15:16:38 2012
New Revision: 1232438
URL: http://svn.apache.org/viewvc?rev=1232438&view=rev
Log:
Timer should not be started unless service is started.
Reduce log level.
Cleanup.
Modified:
openejb/trunk/openejb/server/openejb-ejbd/src/main/java/org/apache/openejb/server/ejbd/KeepAliveServer.java
Modified: openejb/trunk/openejb/server/openejb-ejbd/src/main/java/org/apache/openejb/server/ejbd/KeepAliveServer.java
URL: http://svn.apache.org/viewvc/openejb/trunk/openejb/server/openejb-ejbd/src/main/java/org/apache/openejb/server/ejbd/KeepAliveServer.java?rev=1232438&r1=1232437&r2=1232438&view=diff
==============================================================================
--- openejb/trunk/openejb/server/openejb-ejbd/src/main/java/org/apache/openejb/server/ejbd/KeepAliveServer.java (original)
+++ openejb/trunk/openejb/server/openejb-ejbd/src/main/java/org/apache/openejb/server/ejbd/KeepAliveServer.java Tue Jan 17 15:16:38 2012
@@ -16,14 +16,13 @@
*/
package org.apache.openejb.server.ejbd;
+import org.apache.openejb.client.KeepAliveStyle;
import org.apache.openejb.loader.SystemInstance;
import org.apache.openejb.server.ServerService;
import org.apache.openejb.server.ServiceException;
import org.apache.openejb.server.ServicePool;
import org.apache.openejb.util.LogCategory;
import org.apache.openejb.util.Logger;
-import org.apache.openejb.util.Exceptions;
-import org.apache.openejb.client.KeepAliveStyle;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
@@ -33,8 +32,6 @@ import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.net.Socket;
import java.net.SocketException;
-import java.text.SimpleDateFormat;
-import java.util.Date;
import java.util.Map;
import java.util.Properties;
import java.util.Timer;
@@ -55,7 +52,7 @@ public class KeepAliveServer implements
private final ServerService service;
private final long timeout = (1000 * 3);
- private final AtomicBoolean stop = new AtomicBoolean();
+ private final AtomicBoolean running = new AtomicBoolean(false);
private final KeepAliveTimer keepAliveTimer;
private Timer timer;
@@ -63,16 +60,11 @@ public class KeepAliveServer implements
this(new EjbServer());
}
- public KeepAliveServer(ServerService service) {
+ public KeepAliveServer(final ServerService service) {
this.service = service;
-
- keepAliveTimer = new KeepAliveTimer();
-
- timer = new Timer("KeepAliveTimer", true);
- timer.scheduleAtFixedRate(keepAliveTimer, timeout, timeout / 2);
+ this.keepAliveTimer = new KeepAliveTimer();
}
-
public class KeepAliveTimer extends TimerTask {
// Doesn't need to be a map. Could be a set if Session.equals/hashCode only referenced the Thread.
@@ -80,22 +72,23 @@ public class KeepAliveServer implements
private BlockingQueue<Runnable> queue;
+ @Override
public void run() {
- if (!stop.get()) {
+ if (running.get()) {
closeInactiveSessions();
}
}
private void closeInactiveSessions() {
- BlockingQueue<Runnable> queue = getQueue();
+ final BlockingQueue<Runnable> queue = getQueue();
if (queue == null) return;
int backlog = queue.size();
if (backlog <= 0) return;
- long now = System.currentTimeMillis();
+ final long now = System.currentTimeMillis();
- for (Session session : sessions.values()) {
+ for (final Session session : sessions.values()) {
if (session.usage.tryLock()) {
try {
@@ -104,7 +97,11 @@ public class KeepAliveServer implements
backlog--;
session.socket.close();
} catch (IOException e) {
- logger.info("Error closing socket.", e);
+ if (logger.isWarningEnabled()) {
+ logger.warning("closeInactiveSessions: Error closing socket. Debug for StackTrace");
+ } else if (logger.isDebugEnabled()) {
+ logger.debug("closeInactiveSessions: Error closing socket.", e);
+ }
} finally {
removeSession(session);
}
@@ -121,17 +118,21 @@ public class KeepAliveServer implements
public void closeSessions() {
// Close the ones we can
- for (Session session : sessions.values()) {
+ for (final Session session : sessions.values()) {
if (session.usage.tryLock()) {
try {
session.socket.close();
} catch (IOException e) {
- logger.info("Error closing socket.", e);
+ if (logger.isWarningEnabled()) {
+ logger.warning("closeSessions: Error closing socket. Debug for StackTrace");
+ } else if (logger.isDebugEnabled()) {
+ logger.debug("closeSessions: Error closing socket.", e);
+ }
} finally {
removeSession(session);
session.usage.unlock();
}
- } else {
+ } else if (logger.isDebugEnabled()) {
logger.debug("Allowing graceful shutdown of " + session.socket.getInetAddress());
}
}
@@ -140,19 +141,19 @@ public class KeepAliveServer implements
private BlockingQueue<Runnable> getQueue() {
if (queue == null) {
// this can be null if timer fires before service is fully initialized
- ServicePool incoming = SystemInstance.get().getComponent(ServicePool.class);
+ final ServicePool incoming = SystemInstance.get().getComponent(ServicePool.class);
if (incoming == null) return null;
- ThreadPoolExecutor threadPool = incoming.getThreadPool();
+ final ThreadPoolExecutor threadPool = incoming.getThreadPool();
queue = threadPool.getQueue();
}
return queue;
}
- public Session addSession(Session session) {
+ public Session addSession(final Session session) {
return sessions.put(session.thread, session);
}
- public Session removeSession(Session session) {
+ public Session removeSession(final Session session) {
return sessions.remove(session.thread);
}
}
@@ -168,45 +169,47 @@ public class KeepAliveServer implements
// only used inside the Lock
private final Socket socket;
- public Session(Socket socket) {
+ public Session(final Socket socket) {
this.socket = socket;
this.lastRequest = System.currentTimeMillis();
this.thread = Thread.currentThread();
}
- public void service(Socket socket) throws ServiceException, IOException {
+ public void service(final Socket socket) throws ServiceException, IOException {
keepAliveTimer.addSession(this);
int i = -1;
try {
- InputStream in = new BufferedInputStream(socket.getInputStream());
- OutputStream out = new BufferedOutputStream(socket.getOutputStream());
+ final InputStream in = new BufferedInputStream(socket.getInputStream());
+ final OutputStream out = new BufferedOutputStream(socket.getOutputStream());
- while (!stop.get()) {
+ while (running.get()) {
try {
i = in.read();
} catch (SocketException e) {
// Socket closed.
break;
}
- if (i == -1){
+ if (i == -1) {
// client hung up
break;
}
- KeepAliveStyle style = KeepAliveStyle.values()[i];
+ final KeepAliveStyle style = KeepAliveStyle.values()[i];
try {
usage.lock();
- switch(style){
+ switch (style) {
case PING_PING: {
in.read();
+ break;
}
- break;
+
case PING_PONG: {
out.write(style.ordinal());
out.flush();
+ break;
}
}
@@ -217,7 +220,7 @@ public class KeepAliveServer implements
usage.unlock();
}
}
- } catch (ArrayIndexOutOfBoundsException e){
+ } catch (ArrayIndexOutOfBoundsException e) {
throw new IOException("Unexpected byte " + i);
} catch (InterruptedIOException e) {
Thread.interrupted();
@@ -228,58 +231,73 @@ public class KeepAliveServer implements
}
- public void service(Socket socket) throws ServiceException, IOException {
- Session session = new Session(socket);
+ @Override
+ public void service(final Socket socket) throws ServiceException, IOException {
+ final Session session = new Session(socket);
session.service(socket);
}
- public void service(InputStream in, OutputStream out) throws ServiceException, IOException {
+ @Override
+ public void service(final InputStream in, final OutputStream out) throws ServiceException, IOException {
}
+ @Override
public String getIP() {
return service.getIP();
}
+ @Override
public String getName() {
return service.getName();
}
+ @Override
public int getPort() {
return service.getPort();
}
+ @Override
public void start() throws ServiceException {
- stop.set(false);
-
-// service.start();
+ if (!this.running.getAndSet(true)) {
+ this.timer = new Timer("KeepAliveTimer", true);
+ this.timer.scheduleAtFixedRate(this.keepAliveTimer, this.timeout, (this.timeout / 2));
+ }
}
-
+ @Override
public void stop() throws ServiceException {
- stop.set(true);
- keepAliveTimer.closeSessions();
-// service.stop();
+ if (this.running.getAndSet(false)) {
+ try {
+ this.keepAliveTimer.closeSessions();
+ } catch (Throwable e) {
+ //Ignore
+ }
+ this.timer.cancel();
+ }
}
- public void init(Properties props) throws Exception {
+ @Override
+ public void init(final Properties props) throws Exception {
service.init(props);
}
public class Input extends java.io.FilterInputStream {
- public Input(InputStream in) {
+ public Input(final InputStream in) {
super(in);
}
+ @Override
public void close() throws IOException {
}
}
public class Output extends java.io.FilterOutputStream {
- public Output(OutputStream out) {
+ public Output(final OutputStream out) {
super(out);
}
+ @Override
public void close() throws IOException {
flush();
}