You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/12/02 19:54:41 UTC

svn commit: r886254 - in /incubator/cassandra/trunk: src/java/org/apache/cassandra/concurrent/ src/java/org/apache/cassandra/gms/ src/java/org/apache/cassandra/net/ test/unit/org/apache/cassandra/ test/unit/org/apache/cassandra/net/ test/unit/org/apach...

Author: jbellis
Date: Wed Dec  2 18:54:07 2009
New Revision: 886254

URL: http://svn.apache.org/viewvc?rev=886254&view=rev
Log:
improve testing of StorageService init, and fix shutdown of messagingservice.
patch by gdusbabek; reviewed by jbellis for CASSANDRA-535

Added:
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/net/NetPackageAccessor.java   (with props)
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/StorageServiceTest.java   (with props)
Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/SelectorManager.java
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/CleanupHelper.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java?rev=886254&r1=886253&r2=886254&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java Wed Dec  2 18:54:07 2009
@@ -19,6 +19,7 @@
 package org.apache.cassandra.concurrent;
 
 import java.lang.management.ManagementFactory;
+import java.util.List;
 import java.util.concurrent.*;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
@@ -34,6 +35,7 @@
 public class DebuggableThreadPoolExecutor extends ThreadPoolExecutor implements DebuggableThreadPoolExecutorMBean
 {
     private static Logger logger_ = Logger.getLogger(DebuggableThreadPoolExecutor.class);
+    private final String mbeanName;
 
     public DebuggableThreadPoolExecutor(String threadPoolName) 
     {
@@ -51,9 +53,10 @@
         super.prestartAllCoreThreads();
 
         MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+        mbeanName = "org.apache.cassandra.concurrent:type=" + threadFactory.id;
         try
         {
-            mbs.registerMBean(this, new ObjectName("org.apache.cassandra.concurrent:type=" + threadFactory.id));
+            mbs.registerMBean(this, new ObjectName(mbeanName));
         }
         catch (Exception e)
         {
@@ -84,6 +87,33 @@
         }
     }
 
+    private void unregisterMBean()
+    {
+        try
+        {
+            ManagementFactory.getPlatformMBeanServer().unregisterMBean(new ObjectName(mbeanName));
+        }
+        catch (Exception ex)
+        {
+            // don't let it get in the way, but notify.
+            logger_.error(ex.getMessage(), ex);
+        }
+    }
+
+    @Override
+    public void shutdown()
+    {
+        unregisterMBean();
+        super.shutdown();
+    }
+
+    @Override
+    public List<Runnable> shutdownNow()
+    {
+        unregisterMBean();
+        return super.shutdownNow();
+    }
+
     /**
      * Get the number of completed tasks
      */

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java?rev=886254&r1=886253&r2=886254&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java Wed Dec  2 18:54:07 2009
@@ -122,7 +122,7 @@
         return gossiper_;
     }
 
-    private Timer gossipTimer_ = new Timer(false);
+    private Timer gossipTimer_;
     private InetAddress localEndPoint_;
     private long aVeryLongTime_;
     private Random random_ = new Random();
@@ -144,6 +144,7 @@
 
     private Gossiper()
     {
+        gossipTimer_ = new Timer(false);
         aVeryLongTime_ = 259200 * 1000;
         /* register with the Failure Detector for receiving Failure detector events */
         FailureDetector.instance().registerFailureDetectionEventListener(this);
@@ -834,6 +835,7 @@
     public void stop()
     {
         gossipTimer_.cancel();
+        gossipTimer_ = new Timer(false); // makes the Gossiper reentrant.
     }
 }
 

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=886254&r1=886253&r2=886254&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Wed Dec  2 18:54:07 2009
@@ -69,6 +69,9 @@
     
     /* List of sockets we are listening on */
     private static Map<InetAddress, SelectionKey> listenSockets_ = new HashMap<InetAddress, SelectionKey>();
+
+    /* List of UdpConnections we are listening on */
+    private static Map<InetAddress, UdpConnection> udpConnections_ = new HashMap<InetAddress, UdpConnection>();
     
     /* Lookup table for registering message handlers based on the verb. */
     private static Map<String, IVerbHandler> verbHandlers_;
@@ -214,7 +217,8 @@
         try
         {
             connection.init(localEp);
-            endPoints_.add(localEp);     
+            endPoints_.add(localEp);
+            udpConnections_.put(localEp, connection);
         }
         catch ( IOException e )
         {
@@ -497,7 +501,7 @@
         logger_.info("Shutting down ...");
         synchronized (MessagingService.class)
         {
-            /* Stop listening on any socket */
+            /* Stop listening on any TCP socket */
             for (SelectionKey skey : listenSockets_.values())
             {
                 skey.cancel();
@@ -509,6 +513,13 @@
             }
             listenSockets_.clear();
 
+            /* Stop listening on any UDP ports. */
+            for (UdpConnection con : udpConnections_.values())
+            {
+                con.close();
+            }
+            udpConnections_.clear();
+
             /* Shutdown the threads in the EventQueue's */
             messageDeserializationExecutor_.shutdownNow();
             messageDeserializerExecutor_.shutdownNow();

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/SelectorManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/SelectorManager.java?rev=886254&r1=886253&r2=886254&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/SelectorManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/SelectorManager.java Wed Dec  2 18:54:07 2009
@@ -35,6 +35,9 @@
     // workaround JDK select/register bug
     Object gate = new Object();
 
+    // flag to indicate that shutdown has been requested.
+    private boolean shutdownRequested = false;
+
     // The static selector manager which is used by all applications
     private static SelectorManager manager;
     
@@ -82,7 +85,13 @@
             selector.wakeup();
             return channel.register(selector, ops, handler);
         }
-    }      
+    }
+
+    // requests the thread to shutdown. However, it brings no guarantees. Added for testing.
+    private void requestShutdown()
+    {
+        shutdownRequested = true;
+    }
 
     /**
      * This method starts the socket manager listening for events. It is
@@ -102,6 +111,11 @@
             {
                 throw new RuntimeException(e);
             }
+            if (shutdownRequested)
+            {
+                shutdownRequested = false;
+                break;
+            }
         }
     }
 
@@ -145,6 +159,22 @@
     }
 
     /**
+     * Intended to reset the singleton as part of testing.
+     */
+    static void reset()
+    {
+        synchronized(SelectorManager.class)
+        {
+            if (manager != null)
+                manager.requestShutdown();
+            manager = null;
+            if (udpManager != null)
+                udpManager.requestShutdown();
+            udpManager = null;
+        }
+    }
+
+    /**
      * Returns the SelectorManager applications should use.
      * 
      * @return The SelectorManager which applications should use

Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/CleanupHelper.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/CleanupHelper.java?rev=886254&r1=886253&r2=886254&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/CleanupHelper.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/CleanupHelper.java Wed Dec  2 18:54:07 2009
@@ -19,6 +19,7 @@
 package org.apache.cassandra;
 
 import java.io.File;
+import java.io.IOException;
 
 import org.junit.BeforeClass;
 
@@ -30,6 +31,13 @@
     private static Logger logger = Logger.getLogger(CleanupHelper.class);
 
     @BeforeClass
+    public static void cleanupAndLeaveDirs()
+    {
+        mkdirs();
+        cleanup();
+        mkdirs();
+    }
+
     public static void cleanup()
     {
         // we clean the fs twice, once to start with (so old data files don't get stored by anything static if this is the first run)
@@ -38,6 +46,10 @@
                 DatabaseDescriptor.getLogFileLocation(),
         };
 
+        // try to delete the directories themselves too. don't panic if this fails. it probably means that the process
+        // doesn't have permissions to do so, or it contains non-cassandra generated files that were intentionally
+        // put there.
+
         for (String dirName : directoryNames)
         {
             File dir = new File(dirName);
@@ -47,11 +59,15 @@
             }
             for (File f : dir.listFiles())
             {
-                if (!f.delete()) {
+                if (!f.delete())
+                {
                     logger.error("could not delete " + f);
+                }
             }
+
+            if (!dir.delete())
+                logger.warn("could not delete " + dir.getPath());
         }
-    }
 
         // cleanup data directory which are stored as data directory/table/data files
         for (String dirName : DatabaseDescriptor.getAllDataFileLocations())
@@ -71,8 +87,24 @@
                         }
                     }
                 }
+                if (!tableFile.delete())
+                    logger.warn("could not delete " + dir.getPath());
             }
+
+            if (!dir.delete())
+                logger.warn("could not delete " + dir.getPath());
         }
+    }
 
+    public static void mkdirs()
+    {
+        try
+        {
+            DatabaseDescriptor.createAllDirectories();
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
+        }
     }
 }

Added: incubator/cassandra/trunk/test/unit/org/apache/cassandra/net/NetPackageAccessor.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/net/NetPackageAccessor.java?rev=886254&view=auto
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/net/NetPackageAccessor.java (added)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/net/NetPackageAccessor.java Wed Dec  2 18:54:07 2009
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.net;
+
+/**
+    For accesing package-level members created for the sole purpose of testing.
+ */
+public class NetPackageAccessor
+{
+    public static void resetSelectorManager()
+    {
+        SelectorManager.reset();
+    }
+}

Propchange: incubator/cassandra/trunk/test/unit/org/apache/cassandra/net/NetPackageAccessor.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/StorageServiceTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/StorageServiceTest.java?rev=886254&view=auto
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/StorageServiceTest.java (added)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/StorageServiceTest.java Wed Dec  2 18:54:07 2009
@@ -0,0 +1,68 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.cassandra.service;
+
+import org.apache.cassandra.CleanupHelper;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.net.NetPackageAccessor;
+import org.junit.Test;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+
+import java.io.File;
+import java.io.IOException;
+
+public class StorageServiceTest
+{
+    @Test
+    public void testClientOnlyMode() throws IOException
+    {
+        CleanupHelper.mkdirs();
+        CleanupHelper.cleanup();
+        StorageService.instance().initClient();
+
+        // verify that no storage directories were created.
+        for (String path : DatabaseDescriptor.getAllDataFileLocations())
+        {
+            assertFalse(new File(path).exists());
+        }
+        StorageService.instance().stopClient();
+        NetPackageAccessor.resetSelectorManager();
+    }
+
+    @Test
+    public void testRegularMode() throws IOException, InterruptedException
+    {
+        CleanupHelper.mkdirs();
+        CleanupHelper.cleanup();
+        StorageService.instance().initServer();
+        for (String path : DatabaseDescriptor.getAllDataFileLocations())
+        {
+            // verify that storage directories are there.
+            assertTrue(new File(path).exists());
+        }
+        // a proper test would be to call decommission here, but decommission() mixes both shutdown and datatransfer
+        // calls.  This test is only interested in the shutdown-related items which a properly handled by just
+        // stopping the client.
+        //StorageService.instance().decommission();
+        StorageService.instance().stopClient();
+        NetPackageAccessor.resetSelectorManager();
+    }
+}

Propchange: incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/StorageServiceTest.java
------------------------------------------------------------------------------
    svn:eol-style = native