You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@felix.apache.org by ri...@apache.org on 2009/04/25 18:57:58 UTC

svn commit: r768564 - in /felix/trunk/shell.remote/src/main/java/org/apache/felix/shell/remote: Activator.java Latch.java Listener.java ServiceMediator.java Shell.java TerminalPrintStream.java

Author: rickhall
Date: Sat Apr 25 16:57:58 2009
New Revision: 768564

URL: http://svn.apache.org/viewvc?rev=768564&view=rev
Log:
Refactored remote shell to improve service handling and concurrency handling.
It probably still could be refactored even more, but I don't have the time.

Removed:
    felix/trunk/shell.remote/src/main/java/org/apache/felix/shell/remote/Latch.java
Modified:
    felix/trunk/shell.remote/src/main/java/org/apache/felix/shell/remote/Activator.java
    felix/trunk/shell.remote/src/main/java/org/apache/felix/shell/remote/Listener.java
    felix/trunk/shell.remote/src/main/java/org/apache/felix/shell/remote/ServiceMediator.java
    felix/trunk/shell.remote/src/main/java/org/apache/felix/shell/remote/Shell.java
    felix/trunk/shell.remote/src/main/java/org/apache/felix/shell/remote/TerminalPrintStream.java

Modified: felix/trunk/shell.remote/src/main/java/org/apache/felix/shell/remote/Activator.java
URL: http://svn.apache.org/viewvc/felix/trunk/shell.remote/src/main/java/org/apache/felix/shell/remote/Activator.java?rev=768564&r1=768563&r2=768564&view=diff
==============================================================================
--- felix/trunk/shell.remote/src/main/java/org/apache/felix/shell/remote/Activator.java (original)
+++ felix/trunk/shell.remote/src/main/java/org/apache/felix/shell/remote/Activator.java Sat Apr 25 16:57:58 2009
@@ -24,41 +24,29 @@
  */
 public class Activator implements BundleActivator
 {
-    private static ServiceMediator c_services;
+    private ServiceMediator m_services;
     private Listener m_listener;
 
-    public void start(BundleContext bundleContext) throws Exception
+    public void start(BundleContext context) throws Exception
     {
         //1. Prepare mediator
-        c_services = new ServiceMediator();
-        c_services.activate(bundleContext);
+        m_services = new ServiceMediator(context);
 
         //2. Prepare the listener
-        m_listener = new Listener();
-        m_listener.activate(bundleContext);
+        m_listener = new Listener(context, m_services);
     }
 
-    public void stop(BundleContext bundleContext) throws Exception
+    public void stop(BundleContext context) throws Exception
     {
         if (m_listener != null)
         {
             m_listener.deactivate();
             m_listener = null;
         }
-        if (c_services != null)
+        if (m_services != null)
         {
-            c_services.deactivate();
-            c_services = null;
+            m_services.deactivate();
+            m_services = null;
         }
     }
-
-    /**
-     * Returns a reference to the {@link ServiceMediator} instance used in this bundle.
-     *
-     * @return a {@link ServiceMediator} instance.
-     */
-    static ServiceMediator getServices()
-    {
-        return c_services;
-    }
 }
\ No newline at end of file

Modified: felix/trunk/shell.remote/src/main/java/org/apache/felix/shell/remote/Listener.java
URL: http://svn.apache.org/viewvc/felix/trunk/shell.remote/src/main/java/org/apache/felix/shell/remote/Listener.java?rev=768564&r1=768563&r2=768564&view=diff
==============================================================================
--- felix/trunk/shell.remote/src/main/java/org/apache/felix/shell/remote/Listener.java (original)
+++ felix/trunk/shell.remote/src/main/java/org/apache/felix/shell/remote/Listener.java Sat Apr 25 16:57:58 2009
@@ -33,31 +33,37 @@
  */
 class Listener
 {
-    private int m_port;
-    private String m_ip;
-    private Thread m_listenerThread;
-    private boolean m_stop = false;
-    private ServerSocket m_serverSocket;
-    private AtomicInteger m_useCounter;
-    private int m_maxConnections;
-    private int m_soTimeout;
-    private Set m_connections;
+    private final int m_port;
+    private final String m_ip;
+    private final Thread m_listenerThread;
+    private final Acceptor m_acceptor;
+    private final AtomicInteger m_useCounter;
+    private final int m_maxConnections;
+    private final int m_soTimeout;
+    private final Set m_connections;
+    private final ServiceMediator m_services;
 
     /**
      * Activates this listener on a listener thread (telnetconsole.Listener).
      */
-    public void activate(BundleContext bundleContext)
+    public Listener(BundleContext context, ServiceMediator services) throws IOException
     {
+        m_services = services;
         //configure from framework property
-        m_ip = getProperty(bundleContext, "osgi.shell.telnet.ip", "127.0.0.1");
-        m_port = getProperty(bundleContext, "osgi.shell.telnet.port", 6666);
-        m_soTimeout = getProperty(bundleContext, "osgi.shell.telnet.socketTimeout", 0);
-        m_maxConnections = getProperty(bundleContext, "osgi.shell.telnet.maxconn", 2);
+        m_ip = getProperty(context, "osgi.shell.telnet.ip", "127.0.0.1");
+        m_port = getProperty(context, "osgi.shell.telnet.port", 6666);
+        m_soTimeout = getProperty(context, "osgi.shell.telnet.socketTimeout", 0);
+        m_maxConnections = getProperty(context, "osgi.shell.telnet.maxconn", 2);
         m_useCounter = new AtomicInteger(0);
         m_connections = new HashSet();
-        m_listenerThread = new Thread(new Acceptor(), "telnetconsole.Listener");
+        m_acceptor = new Acceptor();
+        m_listenerThread = new Thread(m_acceptor, "telnetconsole.Listener");
         m_listenerThread.start();
     }//activate
+    public ServiceMediator getServices()
+    {
+        return m_services;
+    }
 
     /**
      * Deactivates this listener.
@@ -70,14 +76,13 @@
     {
         try
         {
-            m_stop = true;
             //wait for the listener thread
-            m_serverSocket.close();
+            m_acceptor.close();
             m_listenerThread.join();
         }
         catch (Exception ex)
         {
-            Activator.getServices().error("Listener::deactivate()", ex);
+            m_services.error("Listener::deactivate()", ex);
         }
 
         // get the active connections (and clear the list)
@@ -103,6 +108,21 @@
      */
     private class Acceptor implements Runnable
     {
+        private volatile boolean m_stop = false;
+        private final ServerSocket m_serverSocket;
+
+        Acceptor() throws IOException
+        {
+            m_serverSocket = new ServerSocket(m_port, 1, InetAddress.getByName(m_ip));
+            m_serverSocket.setSoTimeout(m_soTimeout);
+        }
+
+        public void close() throws IOException
+        {
+            m_stop = true;
+            m_serverSocket.close();
+        }
+
         /**
          * Listens constantly to a server socket and handles incoming connections.
          * One connection will be accepted and routed into the shell, all others will
@@ -122,8 +142,6 @@
                 should be handled properly, but denial of service attacks via massive parallel
                 program logins should be prevented with this.
                  */
-                m_serverSocket = new ServerSocket(m_port, 1, InetAddress.getByName(m_ip));
-                m_serverSocket.setSoTimeout(m_soTimeout);
                 do
                 {
                     try
@@ -161,7 +179,7 @@
             }
             catch (IOException e)
             {
-                Activator.getServices().error("Listener.Acceptor::activate()", e);
+                m_services.error("Listener.Acceptor::run()", e);
             }
         }//run
     }//inner class Acceptor
@@ -179,7 +197,7 @@
             }
             catch (NumberFormatException ex)
             {
-                Activator.getServices().error("Listener::activate()", ex);
+                m_services.error("Listener::activate()", ex);
             }
         }
 
@@ -233,4 +251,4 @@
             m_connections.remove(connection);
         }
     }
-}//class Listener
+}//class Listener
\ No newline at end of file

Modified: felix/trunk/shell.remote/src/main/java/org/apache/felix/shell/remote/ServiceMediator.java
URL: http://svn.apache.org/viewvc/felix/trunk/shell.remote/src/main/java/org/apache/felix/shell/remote/ServiceMediator.java?rev=768564&r1=768563&r2=768564&view=diff
==============================================================================
--- felix/trunk/shell.remote/src/main/java/org/apache/felix/shell/remote/ServiceMediator.java (original)
+++ felix/trunk/shell.remote/src/main/java/org/apache/felix/shell/remote/ServiceMediator.java Sat Apr 25 16:57:58 2009
@@ -25,17 +25,42 @@
 import org.osgi.framework.ServiceListener;
 import org.osgi.framework.ServiceReference;
 import org.osgi.service.log.LogService;
+import org.osgi.util.tracker.ServiceTracker;
 
 /**
  * Implements a mediator pattern class for services from the OSGi container.
  */
 class ServiceMediator
 {
-    private BundleContext m_bundleContext;
-    private ShellService m_felixShellService;
-    private Latch m_felixShellServiceLatch;
-    private LogService m_logService;
-    private Latch m_logServiceLatch;
+    private final String m_bundleName;
+    private final long m_bundleId;
+    private final BundleContext m_context;
+    private final ServiceTracker m_logTracker;
+    private final ServiceTracker m_shellTracker;
+
+    ServiceMediator(BundleContext context)
+    {
+        m_context = context;
+        m_bundleName = (m_context.getBundle().getSymbolicName() == null)
+            ? m_context.getBundle().getLocation()
+            : m_context.getBundle().getSymbolicName();
+        m_bundleId = m_context.getBundle().getBundleId();
+        ServiceTracker logTracker = null;
+        try
+        {
+            logTracker = new ServiceTracker(m_context, LogService.class.getName(), null);
+            logTracker.open();
+        }
+        catch (Throwable ex)
+        {
+            // This means we don't have access to the log service package since it
+            // is optional, so don't track log services.
+            logTracker = null;
+        }
+        m_logTracker = logTracker;
+        m_shellTracker = new ServiceTracker(m_context, ShellService.class.getName(), null);
+        m_shellTracker.open();
+    }
 
     /**
      * Returns a reference to the <tt>ShellService</tt> (Felix).
@@ -45,15 +70,16 @@
      */
     public ShellService getFelixShellService(long wait)
     {
+        ShellService shell = null;
         try
         {
             if (wait < 0)
             {
-                m_felixShellServiceLatch.acquire();
+                shell = (ShellService) m_shellTracker.getService();
             }
-            else if (wait > 0)
+            else
             {
-                m_felixShellServiceLatch.attempt(wait);
+                shell = (ShellService) m_shellTracker.waitForService(wait);
             }
         }
         catch (InterruptedException e)
@@ -61,34 +87,39 @@
             e.printStackTrace(System.err);
         }
 
-        return m_felixShellService;
+        return shell;
     }//getFelixShellService
 
-    public LogService getLogServiceLatch(long wait)
+    public Object getLogServiceLatch(long wait)
     {
-        try
+        Object log = null;
+        if (m_logTracker != null)
         {
-            if (wait < 0)
+            try
             {
-                m_logServiceLatch.acquire();
+                if (wait < 0)
+                {
+                    log = m_logTracker.getService();
+                }
+                else
+                {
+                    log = m_logTracker.waitForService(wait);
+                }
             }
-            else if (wait > 0)
+            catch (InterruptedException e)
             {
-                m_logServiceLatch.attempt(wait);
+                e.printStackTrace(System.err);
             }
         }
-        catch (InterruptedException e)
-        {
-            e.printStackTrace(System.err);
-        }
-        return m_logService;
+        return log;
     }//getLogService
 
     public void info(String msg)
     {
-        if (m_logService != null)
+        Object log = getLogServiceLatch(NO_WAIT);
+        if (log != null)
         {
-            m_logService.log(LogService.LOG_INFO, msg);
+            ((LogService) log).log(LogService.LOG_INFO, msg);
         }
         else
         {
@@ -98,9 +129,10 @@
 
     public void error(String msg, Throwable t)
     {
-        if (m_logService != null)
+        Object log = getLogServiceLatch(NO_WAIT);
+        if (log != null)
         {
-            m_logService.log(LogService.LOG_ERROR, msg, t);
+            ((LogService) log).log(LogService.LOG_ERROR, msg);
         }
         else
         {
@@ -110,9 +142,10 @@
 
     public void error(String msg)
     {
-        if (m_logService != null)
+        Object log = getLogServiceLatch(NO_WAIT);
+        if (log != null)
         {
-            m_logService.log(LogService.LOG_ERROR, msg);
+            ((LogService) log).log(LogService.LOG_ERROR, msg);
         }
         else
         {
@@ -122,9 +155,10 @@
 
     public void debug(String msg)
     {
-        if (m_logService != null)
+        Object log = getLogServiceLatch(NO_WAIT);
+        if (log != null)
         {
-            m_logService.log(LogService.LOG_DEBUG, msg);
+            ((LogService) log).log(LogService.LOG_DEBUG, msg);
         }
         else
         {
@@ -134,9 +168,10 @@
 
     public void warn(String msg)
     {
-        if (m_logService != null)
+        Object log = getLogServiceLatch(NO_WAIT);
+        if (log != null)
         {
-            m_logService.log(LogService.LOG_WARNING, msg);
+            ((LogService) log).log(LogService.LOG_WARNING, msg);
         }
         else
         {
@@ -148,10 +183,9 @@
     {
         //Assemble String
         StringBuffer sbuf = new StringBuffer();
-        Bundle b = m_bundleContext.getBundle();
-        sbuf.append(b.getHeaders().get(Constants.BUNDLE_NAME));
+        sbuf.append(m_bundleName);
         sbuf.append(" [");
-        sbuf.append(b.getBundleId());
+        sbuf.append(m_bundleId);
         sbuf.append("] ");
         sbuf.append(msg);
         System.out.println(sbuf.toString());
@@ -161,10 +195,9 @@
     {
         //Assemble String
         StringBuffer sbuf = new StringBuffer();
-        Bundle b = m_bundleContext.getBundle();
-        sbuf.append(b.getHeaders().get(Constants.BUNDLE_NAME));
+        sbuf.append(m_bundleName);
         sbuf.append(" [");
-        sbuf.append(b.getBundleId());
+        sbuf.append(m_bundleId);
         sbuf.append("] ");
         sbuf.append(msg);
         System.err.println(sbuf.toString());
@@ -175,136 +208,19 @@
     }//logToSystem
 
     /**
-     * Activates this mediator to start tracking the required services using the
-     * OSGi service layer.
-     *
-     * @param bc the bundle's context.
-     * @return true if activated successfully, false otherwise.
-     */
-    public boolean activate(BundleContext bc)
-    {
-        //get the context
-        m_bundleContext = bc;
-
-        m_felixShellServiceLatch = createWaitLatch();
-        m_logServiceLatch = createWaitLatch();
-
-        //prepareDefinitions listener
-        ServiceListener serviceListener = new ServiceListenerImpl();
-
-        //prepareDefinitions the filter, ShellService is required,
-        //LogService may be missing, in which case we only use the
-        // ShellService part of the filter
-        String filter = "(objectclass=" + ShellService.class.getName() + ")";
-        try
-        {
-            filter = "(|" + filter + "(objectclass=" + LogService.class.getName() + "))";
-        }
-        catch (Throwable t)
-        {
-            // ignore
-        }
-
-        try
-        {
-            //add the listener to the bundle context.
-            bc.addServiceListener(serviceListener, filter);
-
-            //ensure that already registered Service instances are registered with
-            //the manager
-            ServiceReference[] srl = bc.getServiceReferences(null, filter);
-            for (int i = 0; srl != null && i < srl.length; i++)
-            {
-                serviceListener.serviceChanged(new ServiceEvent(ServiceEvent.REGISTERED, srl[i]));
-            }
-        }
-        catch (InvalidSyntaxException ex)
-        {
-            ex.printStackTrace(System.err);
-            return false;
-        }
-        return true;
-    }//activate
-
-    /**
      * Deactivates this mediator, nulling out all references.
      * If called when the bundle is stopped, the framework should actually take
      * care of unregistering the <tt>ServiceListener</tt>.
      */
     public void deactivate()
     {
-        m_felixShellService = null;
-        m_felixShellServiceLatch = null;
-        m_bundleContext = null;
-    }//deactivate
-
-    /**
-     * Creates a simple wait latch to be used for the mechanism that allows entities
-     * in the bundles to wait for references.
-     *
-     * @return a new Latch instance.
-     */
-    private Latch createWaitLatch()
-    {
-        return new Latch();
-    }//createWaitLatch
-
-    /**
-     * The <tt>ServiceListener</tt> implementation.
-     */
-    private class ServiceListenerImpl implements ServiceListener
-    {
-        public void serviceChanged(ServiceEvent ev)
+        if (m_logTracker != null)
         {
-            ServiceReference sr = ev.getServiceReference();
-            Object o = null;
-            switch (ev.getType())
-            {
-                case ServiceEvent.REGISTERED:
-                    o = m_bundleContext.getService(sr);
-                    if (o == null)
-                    {
-                        return;
-                    }
-                    else if (o instanceof ShellService)
-                    {
-                        m_felixShellService = (ShellService) o;
-                        m_felixShellServiceLatch.release();
-                    }
-                    else if (o instanceof LogService)
-                    {
-                        m_logService = (LogService) o;
-                        m_logServiceLatch.release();
-                    }
-                    else
-                    {
-                        m_bundleContext.ungetService(sr);
-                    }
-                    break;
-                case ServiceEvent.UNREGISTERING:
-                    o = m_bundleContext.getService(sr);
-                    if (o == null)
-                    {
-                        return;
-                    }
-                    else if (o instanceof ShellService)
-                    {
-                        m_felixShellService = null;
-                        m_felixShellServiceLatch = createWaitLatch();
-                    }
-                    else if (o instanceof LogService)
-                    {
-                        m_logService = null;
-                        m_logServiceLatch = createWaitLatch();
-                    }
-                    else
-                    {
-                        m_bundleContext.ungetService(sr);
-                    }
-                    break;
-            }
+            m_logTracker.close();
         }
-    }//inner class ServiceListenerImpl
-    public static long WAIT_UNLIMITED = -1;
-    public static long NO_WAIT = 0;
-}//class ServiceMediator
+        m_shellTracker.close();
+    }//deactivate
+
+    public static long WAIT_UNLIMITED = 0;
+    public static long NO_WAIT = -1;
+}//class ServiceMediator
\ No newline at end of file

Modified: felix/trunk/shell.remote/src/main/java/org/apache/felix/shell/remote/Shell.java
URL: http://svn.apache.org/viewvc/felix/trunk/shell.remote/src/main/java/org/apache/felix/shell/remote/Shell.java?rev=768564&r1=768563&r2=768564&view=diff
==============================================================================
--- felix/trunk/shell.remote/src/main/java/org/apache/felix/shell/remote/Shell.java (original)
+++ felix/trunk/shell.remote/src/main/java/org/apache/felix/shell/remote/Shell.java Sat Apr 25 16:57:58 2009
@@ -37,9 +37,10 @@
  */
 class Shell implements Runnable
 {
-    private Listener m_owner;
-    private Socket m_socket;
-    private AtomicInteger m_useCounter;
+    private final Listener m_owner;
+    private final Socket m_socket;
+    private final AtomicInteger m_useCounter;
+    private volatile TerminalPrintStream m_out;
 
     public Shell(Listener owner, Socket s, AtomicInteger counter)
     {
@@ -63,27 +64,29 @@
 
         try
         {
-            PrintStream out = new TerminalPrintStream(m_socket.getOutputStream());
-            BufferedReader in = new BufferedReader(new TerminalReader(m_socket.getInputStream(), out));
+            m_out = new TerminalPrintStream(
+                m_owner.getServices(), m_socket.getOutputStream());
+            BufferedReader in = new BufferedReader(
+                new TerminalReader(m_socket.getInputStream(), m_out));
             ReentrantLock lock = new ReentrantLock();
 
             // Print welcome banner.
-            out.println();
-            out.println("Felix Remote Shell Console:");
-            out.println("============================");
-            out.println("");
+            m_out.println();
+            m_out.println("Felix Remote Shell Console:");
+            m_out.println("============================");
+            m_out.println("");
 
             do
             {
-                out.print("-> ");
                 String line = "";
                 try
                 {
+                    m_out.print("-> ");
                     line = in.readLine();
                     //make sure to capture end of stream
                     if (line == null)
                     {
-                        out.println("exit");
+                        m_out.println("exit");
                         return;
                     }
                 }
@@ -98,15 +101,15 @@
                     return;
                 }
 
-                ShellService shs = Activator.getServices().getFelixShellService(ServiceMediator.NO_WAIT);
+                ShellService shs = m_owner.getServices().getFelixShellService(ServiceMediator.NO_WAIT);
                 try
                 {
                     lock.acquire();
-                    shs.executeCommand(line, out, out);
+                    shs.executeCommand(line, m_out, m_out);
                 }
                 catch (Exception ex)
                 {
-                    Activator.getServices().error("Shell::run()", ex);
+                    m_owner.getServices().error("Shell::run()", ex);
                 }
                 finally
                 {
@@ -117,7 +120,7 @@
         }
         catch (IOException ex)
         {
-            Activator.getServices().error("Shell::run()", ex);
+            m_owner.getServices().error("Shell::run()", ex);
         }
         finally
         {
@@ -129,20 +132,12 @@
     private void exit(String message)
     {
         // farewell message
-        try
-        {
-            PrintStream out = new TerminalPrintStream(m_socket.getOutputStream());
-            if (message != null)
-            {
-                out.println(message);
-            }
-            out.println("Good Bye!");
-            out.close();
-        }
-        catch (IOException ioe)
+        if (message != null)
         {
-            // ignore
+            m_out.println(message);
         }
+        m_out.println("Good Bye!");
+        m_out.close();
 
         try
         {
@@ -150,7 +145,7 @@
         }
         catch (IOException ex)
         {
-            Activator.getServices().error("Shell::exit()", ex);
+            m_owner.getServices().error("Shell::exit()", ex);
         }
         m_owner.unregisterConnection(this);
         m_useCounter.decrement();

Modified: felix/trunk/shell.remote/src/main/java/org/apache/felix/shell/remote/TerminalPrintStream.java
URL: http://svn.apache.org/viewvc/felix/trunk/shell.remote/src/main/java/org/apache/felix/shell/remote/TerminalPrintStream.java?rev=768564&r1=768563&r2=768564&view=diff
==============================================================================
--- felix/trunk/shell.remote/src/main/java/org/apache/felix/shell/remote/TerminalPrintStream.java (original)
+++ felix/trunk/shell.remote/src/main/java/org/apache/felix/shell/remote/TerminalPrintStream.java Sat Apr 25 16:57:58 2009
@@ -19,20 +19,25 @@
 import java.io.IOException;
 import java.io.OutputStream;
 import java.io.PrintStream;
+import java.net.SocketException;
 
 /**
  * Class implementing a <tt>TerminalPrintStream</tt>.
  */
 class TerminalPrintStream extends PrintStream
 {
+    private final ServiceMediator m_services;
+    private volatile boolean m_isClosed = false;
+
     /**
      * Constructs a new instance wrapping the given <tt>OutputStream</tt>.
      *
      * @param tout the <tt>OutputStream</tt> to be wrapped.
      */
-    public TerminalPrintStream(OutputStream tout)
+    public TerminalPrintStream(ServiceMediator services, OutputStream tout)
     {
         super(tout);
+        m_services = services;
     }//constructor
 
     public void print(String str)
@@ -43,9 +48,12 @@
             out.write(bytes, 0, bytes.length);
             flush();
         }
-        catch (IOException ex)
+        catch (Exception ex)
         {
-            Activator.getServices().error("TerminalPrintStream::print()", ex);
+            if (!m_isClosed)
+            {
+                m_services.error("TerminalPrintStream::print()", ex);
+            }
         }
     }//print
 
@@ -62,7 +70,13 @@
         }
         catch (IOException ex)
         {
-            Activator.getServices().error("TerminalPrintStream::println()", ex);
+            m_services.error("TerminalPrintStream::println()", ex);
         }
     }//flush
+
+    public void close()
+    {
+        m_isClosed = true;
+        super.close();
+    }
 }//class TerminalPrintStream