You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/01/21 00:38:21 UTC
svn commit: r901437 - in /incubator/cassandra/trunk: conf/
src/java/org/apache/cassandra/gms/ src/java/org/apache/cassandra/net/
src/java/org/apache/cassandra/service/ test/conf/
test/unit/org/apache/cassandra/net/ test/unit/org/apache/cassandra/service/
Author: jbellis
Date: Wed Jan 20 23:38:20 2010
New Revision: 901437
URL: http://svn.apache.org/viewvc?rev=901437&view=rev
Log:
convert gossip to use tcp. this finishes the removal of nio
patch by jbellis; reviewed by Brandon Williams for CASSANDRA-705
Added:
incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/StorageServiceClientTest.java (with props)
incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java (contents, props changed)
- copied, changed from r901436, incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/StorageServiceTest.java
Removed:
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/SelectionKeyHandler.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/SelectorManager.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/UdpConnection.java
incubator/cassandra/trunk/test/unit/org/apache/cassandra/net/NetPackageAccessor.java
incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/StorageServiceTest.java
Modified:
incubator/cassandra/trunk/conf/storage-conf.xml
incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
incubator/cassandra/trunk/test/conf/storage-conf.xml
Modified: incubator/cassandra/trunk/conf/storage-conf.xml
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/conf/storage-conf.xml?rev=901437&r1=901436&r2=901437&view=diff
==============================================================================
--- incubator/cassandra/trunk/conf/storage-conf.xml (original)
+++ incubator/cassandra/trunk/conf/storage-conf.xml Wed Jan 20 23:38:20 2010
@@ -221,10 +221,8 @@
~ address associated with the hostname (it might not be).
-->
<ListenAddress>localhost</ListenAddress>
- <!-- TCP port, for commands and data -->
+ <!-- internal communications port -->
<StoragePort>7000</StoragePort>
- <!-- UDP port, for membership communications (gossip) -->
- <ControlPort>7001</ControlPort>
<!--
~ The address to bind the Thrift RPC service to. Unlike ListenAddress
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java?rev=901437&r1=901436&r2=901437&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java Wed Jan 20 23:38:20 2010
@@ -308,7 +308,7 @@
InetAddress to = liveEndPoints.get(index);
if (logger_.isTraceEnabled())
logger_.trace("Sending a GossipDigestSynMessage to " + to + " ...");
- MessagingService.instance.sendUdpOneWay(message, to);
+ MessagingService.instance.sendOneWay(message, to);
return seeds_.contains(to);
}
@@ -887,7 +887,7 @@
Message gDigestAckMessage = Gossiper.instance.makeGossipDigestAckMessage(gDigestAck);
if (logger_.isTraceEnabled())
logger_.trace("Sending a GossipDigestAckMessage to " + from);
- MessagingService.instance.sendUdpOneWay(gDigestAckMessage, from);
+ MessagingService.instance.sendOneWay(gDigestAckMessage, from);
}
catch (IOException e)
{
@@ -979,7 +979,7 @@
Message gDigestAck2Message = Gossiper.instance.makeGossipDigestAck2Message(gDigestAck2);
if (logger_.isTraceEnabled())
logger_.trace("Sending a GossipDigestAck2Message to " + from);
- MessagingService.instance.sendUdpOneWay(gDigestAck2Message, from);
+ MessagingService.instance.sendOneWay(gDigestAck2Message, from);
}
catch ( IOException e )
{
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=901437&r1=901436&r2=901437&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Wed Jan 20 23:38:20 2010
@@ -166,26 +166,7 @@
}
}, "ACCEPT-" + localEp).start();
}
-
- /**
- * Listen on the specified port.
- * @param localEp InetAddress whose port to listen on.
- */
- public void listenUDP(InetAddress localEp)
- {
- UdpConnection connection = new UdpConnection();
- if (logger_.isDebugEnabled())
- logger_.debug("Starting to listen on " + localEp);
- try
- {
- connection.init(localEp);
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
- }
-
+
public static OutboundTcpConnectionPool getConnectionPool(InetAddress from, InetAddress to)
{
String key = from + ":" + to;
@@ -345,36 +326,6 @@
}
/**
- * Send a message to a given endpoint. This method adheres to the fire and forget
- * style messaging.
- * @param message messages to be sent.
- * @param to endpoint to which the message needs to be sent
- */
- public void sendUdpOneWay(Message message, InetAddress to)
- {
- if (message.getFrom().equals(to)) {
- MessagingService.receive(message);
- return;
- }
-
- UdpConnection connection = null;
- try
- {
- connection = new UdpConnection();
- connection.init();
- connection.write(message, to);
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
- finally
- {
- if ( connection != null )
- connection.close();
- }
- }
- /**
* Stream a file from source to destination. This is highly optimized
* to not hold any of the contents of the file in memory.
* @param file name of file to stream.
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java?rev=901437&r1=901436&r2=901437&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java Wed Jan 20 23:38:20 2010
@@ -47,7 +47,8 @@
*/
synchronized OutboundTcpConnection getConnection(Message msg)
{
- if (StageManager.RESPONSE_STAGE.equals(msg.getMessageType()))
+ if (StageManager.RESPONSE_STAGE.equals(msg.getMessageType())
+ || StageManager.GOSSIP_STAGE.equals(msg.getMessageType()))
{
if (ackCon == null)
ackCon = newCon();
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=901437&r1=901436&r2=901437&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Wed Jan 20 23:38:20 2010
@@ -232,11 +232,6 @@
isClientMode = true;
logger_.info("Starting up client gossip");
MessagingService.instance.listen(FBUtilities.getLocalAddress());
- MessagingService.instance.listenUDP(FBUtilities.getLocalAddress());
-
- SelectorManager.getSelectorManager().start();
- SelectorManager.getUdpSelectorManager().start();
-
Gossiper.instance.register(this);
Gossiper.instance.start(FBUtilities.getLocalAddress(), (int)(System.currentTimeMillis() / 1000)); // needed for node-ring gathering.
}
@@ -248,13 +243,7 @@
DatabaseDescriptor.createAllDirectories();
logger_.info("Starting up server gossip");
- /* Listen for application messages */
MessagingService.instance.listen(FBUtilities.getLocalAddress());
- /* Listen for control messages */
- MessagingService.instance.listenUDP(FBUtilities.getLocalAddress());
-
- SelectorManager.getSelectorManager().start();
- SelectorManager.getUdpSelectorManager().start();
StorageLoadBalancer.instance.startBroadcasting();
Modified: incubator/cassandra/trunk/test/conf/storage-conf.xml
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/conf/storage-conf.xml?rev=901437&r1=901436&r2=901437&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/conf/storage-conf.xml (original)
+++ incubator/cassandra/trunk/test/conf/storage-conf.xml Wed Jan 20 23:38:20 2010
@@ -29,7 +29,6 @@
<RpcTimeoutInMillis>5000</RpcTimeoutInMillis>
<ListenAddress>127.0.0.1</ListenAddress>
<StoragePort>7010</StoragePort>
- <ControlPort>7011</ControlPort>
<ThriftPort>9170</ThriftPort>
<ColumnIndexSizeInKB>4</ColumnIndexSizeInKB>
<CommitLogDirectory>build/test/cassandra/commitlog</CommitLogDirectory>
Added: incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/StorageServiceClientTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/StorageServiceClientTest.java?rev=901437&view=auto
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/StorageServiceClientTest.java (added)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/StorageServiceClientTest.java Wed Jan 20 23:38:20 2010
@@ -0,0 +1,47 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.cassandra.service;
+
+import org.apache.cassandra.CleanupHelper;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.junit.Test;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+
+import java.io.File;
+import java.io.IOException;
+
+public class StorageServiceClientTest
+{
+ @Test
+ public void testClientOnlyMode() throws IOException
+ {
+ CleanupHelper.mkdirs();
+ CleanupHelper.cleanup();
+ StorageService.instance.initClient();
+
+ // verify that no storage directories were created.
+ for (String path : DatabaseDescriptor.getAllDataFileLocations())
+ {
+ assertFalse(new File(path).exists());
+ }
+ StorageService.instance.stopClient();
+ }
+}
Propchange: incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/StorageServiceClientTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Copied: incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java (from r901436, incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/StorageServiceTest.java)
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java?p2=incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java&p1=incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/StorageServiceTest.java&r1=901436&r2=901437&rev=901437&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/StorageServiceTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java Wed Jan 20 23:38:20 2010
@@ -19,34 +19,19 @@
package org.apache.cassandra.service;
-import org.apache.cassandra.CleanupHelper;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.net.NetPackageAccessor;
-import org.junit.Test;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.assertFalse;
-
import java.io.File;
import java.io.IOException;
-public class StorageServiceTest
-{
- @Test
- public void testClientOnlyMode() throws IOException
- {
- CleanupHelper.mkdirs();
- CleanupHelper.cleanup();
- StorageService.instance.initClient();
+import org.junit.Test;
- // verify that no storage directories were created.
- for (String path : DatabaseDescriptor.getAllDataFileLocations())
- {
- assertFalse(new File(path).exists());
- }
- StorageService.instance.stopClient();
- NetPackageAccessor.resetSelectorManager();
- }
+import org.apache.cassandra.CleanupHelper;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class StorageServiceServerTest
+{
@Test
public void testRegularMode() throws IOException, InterruptedException
{
@@ -63,6 +48,5 @@
// stopping the client.
//StorageService.instance.decommission();
StorageService.instance.stopClient();
- NetPackageAccessor.resetSelectorManager();
}
-}
+}
\ No newline at end of file
Propchange: incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java
------------------------------------------------------------------------------
svn:eol-style = native