You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/01/21 00:38:21 UTC

svn commit: r901437 - in /incubator/cassandra/trunk: conf/ src/java/org/apache/cassandra/gms/ src/java/org/apache/cassandra/net/ src/java/org/apache/cassandra/service/ test/conf/ test/unit/org/apache/cassandra/net/ test/unit/org/apache/cassandra/service/

Author: jbellis
Date: Wed Jan 20 23:38:20 2010
New Revision: 901437

URL: http://svn.apache.org/viewvc?rev=901437&view=rev
Log:
convert gossip to use tcp.  this finishes the removal of nio
patch by jbellis; reviewed by Brandon Williams for CASSANDRA-705

Added:
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/StorageServiceClientTest.java   (with props)
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java   (contents, props changed)
      - copied, changed from r901436, incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/StorageServiceTest.java
Removed:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/SelectionKeyHandler.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/SelectorManager.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/UdpConnection.java
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/net/NetPackageAccessor.java
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/StorageServiceTest.java
Modified:
    incubator/cassandra/trunk/conf/storage-conf.xml
    incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
    incubator/cassandra/trunk/test/conf/storage-conf.xml

Modified: incubator/cassandra/trunk/conf/storage-conf.xml
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/conf/storage-conf.xml?rev=901437&r1=901436&r2=901437&view=diff
==============================================================================
--- incubator/cassandra/trunk/conf/storage-conf.xml (original)
+++ incubator/cassandra/trunk/conf/storage-conf.xml Wed Jan 20 23:38:20 2010
@@ -221,10 +221,8 @@
    ~ address associated with the hostname (it might not be).
   -->
   <ListenAddress>localhost</ListenAddress>
-  <!-- TCP port, for commands and data -->
+  <!-- internal communications port -->
   <StoragePort>7000</StoragePort>
-  <!-- UDP port, for membership communications (gossip) -->
-  <ControlPort>7001</ControlPort>
 
   <!--
    ~ The address to bind the Thrift RPC service to. Unlike ListenAddress

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java?rev=901437&r1=901436&r2=901437&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java Wed Jan 20 23:38:20 2010
@@ -308,7 +308,7 @@
         InetAddress to = liveEndPoints.get(index);
         if (logger_.isTraceEnabled())
             logger_.trace("Sending a GossipDigestSynMessage to " + to + " ...");
-        MessagingService.instance.sendUdpOneWay(message, to);
+        MessagingService.instance.sendOneWay(message, to);
         return seeds_.contains(to);
     }
 
@@ -887,7 +887,7 @@
             Message gDigestAckMessage = Gossiper.instance.makeGossipDigestAckMessage(gDigestAck);
             if (logger_.isTraceEnabled())
                 logger_.trace("Sending a GossipDigestAckMessage to " + from);
-            MessagingService.instance.sendUdpOneWay(gDigestAckMessage, from);
+            MessagingService.instance.sendOneWay(gDigestAckMessage, from);
         }
         catch (IOException e)
         {
@@ -979,7 +979,7 @@
             Message gDigestAck2Message = Gossiper.instance.makeGossipDigestAck2Message(gDigestAck2);
             if (logger_.isTraceEnabled())
                 logger_.trace("Sending a GossipDigestAck2Message to " + from);
-            MessagingService.instance.sendUdpOneWay(gDigestAck2Message, from);
+            MessagingService.instance.sendOneWay(gDigestAck2Message, from);
         }
         catch ( IOException e )
         {

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=901437&r1=901436&r2=901437&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Wed Jan 20 23:38:20 2010
@@ -166,26 +166,7 @@
             }
         }, "ACCEPT-" + localEp).start();
     }
-    
-    /**
-     * Listen on the specified port.
-     * @param localEp InetAddress whose port to listen on.
-     */
-    public void listenUDP(InetAddress localEp)
-    {
-        UdpConnection connection = new UdpConnection();
-        if (logger_.isDebugEnabled())
-          logger_.debug("Starting to listen on " + localEp);
-        try
-        {
-            connection.init(localEp);
-        }
-        catch (IOException e)
-        {
-            throw new RuntimeException(e);
-        }
-    }
-    
+
     public static OutboundTcpConnectionPool getConnectionPool(InetAddress from, InetAddress to)
     {
         String key = from + ":" + to;
@@ -345,36 +326,6 @@
     }
     
     /**
-     * Send a message to a given endpoint. This method adheres to the fire and forget
-     * style messaging.
-     * @param message messages to be sent.
-     * @param to endpoint to which the message needs to be sent
-     */
-    public void sendUdpOneWay(Message message, InetAddress to)
-    {
-        if (message.getFrom().equals(to)) {
-            MessagingService.receive(message);
-            return;
-        }
-        
-        UdpConnection connection = null;
-        try
-        {
-            connection = new UdpConnection(); 
-            connection.init();            
-            connection.write(message, to);            
-        }            
-        catch (IOException e)
-        {
-            throw new RuntimeException(e);
-        }
-        finally
-        {
-            if ( connection != null )
-                connection.close();
-        }
-    }
-    /**
      * Stream a file from source to destination. This is highly optimized
      * to not hold any of the contents of the file in memory.
      * @param file name of file to stream.

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java?rev=901437&r1=901436&r2=901437&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java Wed Jan 20 23:38:20 2010
@@ -47,7 +47,8 @@
      */
     synchronized OutboundTcpConnection getConnection(Message msg)
     {
-        if (StageManager.RESPONSE_STAGE.equals(msg.getMessageType()))
+        if (StageManager.RESPONSE_STAGE.equals(msg.getMessageType())
+            || StageManager.GOSSIP_STAGE.equals(msg.getMessageType()))
         {
             if (ackCon == null)
                 ackCon = newCon();

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=901437&r1=901436&r2=901437&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Wed Jan 20 23:38:20 2010
@@ -232,11 +232,6 @@
         isClientMode = true;
         logger_.info("Starting up client gossip");
         MessagingService.instance.listen(FBUtilities.getLocalAddress());
-        MessagingService.instance.listenUDP(FBUtilities.getLocalAddress());
-
-        SelectorManager.getSelectorManager().start();
-        SelectorManager.getUdpSelectorManager().start();
-
         Gossiper.instance.register(this);
         Gossiper.instance.start(FBUtilities.getLocalAddress(), (int)(System.currentTimeMillis() / 1000)); // needed for node-ring gathering.
     }
@@ -248,13 +243,7 @@
         DatabaseDescriptor.createAllDirectories();
         logger_.info("Starting up server gossip");
 
-        /* Listen for application messages */
         MessagingService.instance.listen(FBUtilities.getLocalAddress());
-        /* Listen for control messages */
-        MessagingService.instance.listenUDP(FBUtilities.getLocalAddress());
-
-        SelectorManager.getSelectorManager().start();
-        SelectorManager.getUdpSelectorManager().start();
 
         StorageLoadBalancer.instance.startBroadcasting();
 

Modified: incubator/cassandra/trunk/test/conf/storage-conf.xml
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/conf/storage-conf.xml?rev=901437&r1=901436&r2=901437&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/conf/storage-conf.xml (original)
+++ incubator/cassandra/trunk/test/conf/storage-conf.xml Wed Jan 20 23:38:20 2010
@@ -29,7 +29,6 @@
    <RpcTimeoutInMillis>5000</RpcTimeoutInMillis>
    <ListenAddress>127.0.0.1</ListenAddress>
    <StoragePort>7010</StoragePort>
-   <ControlPort>7011</ControlPort>
    <ThriftPort>9170</ThriftPort>
    <ColumnIndexSizeInKB>4</ColumnIndexSizeInKB>
    <CommitLogDirectory>build/test/cassandra/commitlog</CommitLogDirectory>

Added: incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/StorageServiceClientTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/StorageServiceClientTest.java?rev=901437&view=auto
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/StorageServiceClientTest.java (added)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/StorageServiceClientTest.java Wed Jan 20 23:38:20 2010
@@ -0,0 +1,47 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.cassandra.service;
+
+import org.apache.cassandra.CleanupHelper;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.junit.Test;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+
+import java.io.File;
+import java.io.IOException;
+
+public class StorageServiceClientTest
+{
+    @Test
+    public void testClientOnlyMode() throws IOException
+    {
+        CleanupHelper.mkdirs();
+        CleanupHelper.cleanup();
+        StorageService.instance.initClient();
+
+        // verify that no storage directories were created.
+        for (String path : DatabaseDescriptor.getAllDataFileLocations())
+        {
+            assertFalse(new File(path).exists());
+        }
+        StorageService.instance.stopClient();
+    }
+}

Propchange: incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/StorageServiceClientTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Copied: incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java (from r901436, incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/StorageServiceTest.java)
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java?p2=incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java&p1=incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/StorageServiceTest.java&r1=901436&r2=901437&rev=901437&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/StorageServiceTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java Wed Jan 20 23:38:20 2010
@@ -19,34 +19,19 @@
 
 package org.apache.cassandra.service;
 
-import org.apache.cassandra.CleanupHelper;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.net.NetPackageAccessor;
-import org.junit.Test;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.assertFalse;
-
 import java.io.File;
 import java.io.IOException;
 
-public class StorageServiceTest
-{
-    @Test
-    public void testClientOnlyMode() throws IOException
-    {
-        CleanupHelper.mkdirs();
-        CleanupHelper.cleanup();
-        StorageService.instance.initClient();
+import org.junit.Test;
 
-        // verify that no storage directories were created.
-        for (String path : DatabaseDescriptor.getAllDataFileLocations())
-        {
-            assertFalse(new File(path).exists());
-        }
-        StorageService.instance.stopClient();
-        NetPackageAccessor.resetSelectorManager();
-    }
+import org.apache.cassandra.CleanupHelper;
+import org.apache.cassandra.config.DatabaseDescriptor;
 
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class StorageServiceServerTest
+{
     @Test
     public void testRegularMode() throws IOException, InterruptedException
     {
@@ -63,6 +48,5 @@
         // stopping the client.
         //StorageService.instance.decommission();
         StorageService.instance.stopClient();
-        NetPackageAccessor.resetSelectorManager();
     }
-}
+}
\ No newline at end of file

Propchange: incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java
------------------------------------------------------------------------------
    svn:eol-style = native