You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/09/13 23:08:49 UTC

svn commit: r1170350 [2/2] - in /cassandra/trunk: ./ conf/ contrib/ interface/ interface/thrift/gen-java/org/apache/cassandra/thrift/ src/java/org/apache/cassandra/db/compaction/ src/java/org/apache/cassandra/dht/ src/java/org/apache/cassandra/hadoop/ ...

Modified: cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java?rev=1170350&r1=1170349&r2=1170350&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java Tue Sep 13 21:08:48 2011
@@ -165,7 +165,12 @@ public class BootStrapper
         }
 
         if (endpoints.isEmpty())
-            throw new RuntimeException("No other nodes seen!  Unable to bootstrap");
+            throw new RuntimeException("No other nodes seen!  Unable to bootstrap."
+                                       + "If you intended to start a single-node cluster, you should make sure "
+                                       + "your broadcast_address (or listen_address) is listed as a seed.  "
+                                       + "Otherwise, you need to determine why the seed being contacted "
+                                       + "has no knowledge of the rest of the cluster.  Usually, this can be solved "
+                                       + "by giving all nodes the same seed list.");
         Collections.sort(endpoints, new Comparator<InetAddress>()
         {
             public int compare(InetAddress ia1, InetAddress ia2)

Modified: cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java?rev=1170350&r1=1170349&r2=1170350&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java Tue Sep 13 21:08:48 2011
@@ -143,6 +143,8 @@ public class ColumnFamilyRecordReader ex
         return true;
     }
 
+    // we don't use endpointsnitch since we are trying to support hadoop nodes that are
+    // not necessarily on Cassandra machines, too.  This should be adequate for single-DC clusters, at least.
     private String getLocation()
     {
         InetAddress[] localAddresses;
@@ -173,7 +175,7 @@ public class ColumnFamilyRecordReader ex
                 }
             }
         }
-        throw new UnsupportedOperationException("no local connection available");
+        return split.getLocations()[0];
     }
 
     private class RowIterator extends AbstractIterator<Pair<ByteBuffer, SortedMap<ByteBuffer, IColumn>>>

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java?rev=1170350&r1=1170349&r2=1170350&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java Tue Sep 13 21:08:48 2011
@@ -25,6 +25,8 @@ import java.nio.ByteBuffer;
 import java.nio.MappedByteBuffer;
 import java.nio.channels.FileChannel;
 
+import org.apache.cassandra.utils.ByteBufferUtil;
+
 public class MappedFileDataInput extends AbstractDataInput implements FileDataInput
 {
     private final MappedByteBuffer buffer;
@@ -117,11 +119,18 @@ public class MappedFileDataInput extends
             throw new IOException(String.format("mmap segment underflow; remaining is %d but %d requested",
                                                 remaining, length));
 
+        if (length == 0)
+            return ByteBufferUtil.EMPTY_BYTE_BUFFER;
+
         ByteBuffer bytes = buffer.duplicate();
         bytes.position(buffer.position() + position).limit(buffer.position() + position + length);
         position += length;
 
-        return bytes;
+        // we have to copy the data in case we unreference the underlying sstable.  See CASSANDRA-3179
+        ByteBuffer clone = ByteBuffer.allocate(bytes.remaining());
+        clone.put(bytes);
+        clone.flip();
+        return clone;
     }
 
     @Override

Modified: cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java?rev=1170350&r1=1170349&r2=1170350&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java Tue Sep 13 21:08:48 2011
@@ -67,6 +67,8 @@ public class IncomingTcpConnection exten
             int header = input.readInt();
             isStream = MessagingService.getBits(header, 3, 1) == 1;
             version = MessagingService.getBits(header, 15, 8);
+            if (logger.isDebugEnabled())
+                logger.debug("Version for " + socket.getInetAddress() + " is " + version);
             if (isStream)
             {
                 if (version == MessagingService.version_)
@@ -98,6 +100,7 @@ public class IncomingTcpConnection exten
             else if (msg != null)
             {
                 Gossiper.instance.setVersion(msg.getFrom(), version);
+                logger.debug("set version for {} to {}", socket.getInetAddress(), version);
             }
             
             // loop to get the next message.
@@ -108,6 +111,7 @@ public class IncomingTcpConnection exten
                 header = input.readInt();
                 assert isStream == (MessagingService.getBits(header, 3, 1) == 1) : "Connections cannot change type: " + isStream;
                 version = MessagingService.getBits(header, 15, 8);
+                logger.debug("Version is now {}", version);
                 receiveMessage(input, version);
             }
         } 
@@ -153,7 +157,7 @@ public class IncomingTcpConnection exten
             MessagingService.instance().receive(message, id);
             return message;
         }
-        logger.info("Received connection from newer protocol version. Ignorning message.");
+        logger.debug("Received connection from newer protocol version {}. Ignorning message", version);
         return null;
     }
 

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java?rev=1170350&r1=1170349&r2=1170350&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java Tue Sep 13 21:08:48 2011
@@ -346,15 +346,15 @@ public abstract class AbstractCassandraD
             }
             
             start();
-        } catch (Throwable e)
+        }
+        catch (Throwable e)
         {
-            String msg = "Exception encountered during startup.";
-            logger.error(msg, e);
+            logger.error("Exception encountered during startup", e);
             
             // try to warn user on stdout too, if we haven't already detached
-            System.out.println(msg);
             e.printStackTrace();
-            
+            System.out.println("Exception encountered during startup: " + e.getMessage());
+
             System.exit(3);
         }
     }

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/GCInspector.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/GCInspector.java?rev=1170350&r1=1170349&r2=1170350&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/GCInspector.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/GCInspector.java Tue Sep 13 21:08:48 2011
@@ -107,7 +107,7 @@ public class GCInspector
             
             if (previousCount == null)
                 previousCount = 0L;           
-            if (count == previousCount)
+            if (count.equals(previousCount))
                 continue;
             
             gccounts.put(gc.getName(), count);

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/RowRepairResolver.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/RowRepairResolver.java?rev=1170350&r1=1170349&r2=1170350&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/RowRepairResolver.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/RowRepairResolver.java Tue Sep 13 21:08:48 2011
@@ -79,7 +79,7 @@ public class RowRepairResolver extends A
                 endpoints.add(message.getFrom());
 
                 // compute maxLiveColumns to prevent short reads -- see https://issues.apache.org/jira/browse/CASSANDRA-2643
-                int liveColumns = cf.getLiveColumnCount();
+                int liveColumns = cf == null ? 0 : cf.getLiveColumnCount();
                 if (liveColumns > maxLiveColumns)
                     maxLiveColumns = liveColumns;
             }

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=1170350&r1=1170349&r2=1170350&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Tue Sep 13 21:08:48 2011
@@ -31,9 +31,7 @@ import javax.lang.model.type.TypeKind;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Multimap;
+import com.google.common.collect.*;
 import org.apache.cassandra.config.*;
 import org.apache.log4j.Level;
 import org.apache.commons.lang.StringUtils;
@@ -44,6 +42,7 @@ import org.apache.cassandra.concurrent.D
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.Table;
 import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.dht.*;
 import org.apache.cassandra.gms.*;
@@ -641,11 +640,6 @@ public class StorageService implements I
      */
     public Map<Range, List<String>> getRangeToEndpointMap(String keyspace)
     {
-        // some people just want to get a visual representation of things. Allow null and set it to the first
-        // non-system table.
-        if (keyspace == null)
-            keyspace = Schema.instance.getNonSystemTables().get(0);
-
         /* All the ranges for the tokens */
         Map<Range, List<String>> map = new HashMap<Range, List<String>>();
         for (Map.Entry<Range,List<InetAddress>> entry : getRangeToAddressMap(keyspace).entrySet())
@@ -656,17 +650,27 @@ public class StorageService implements I
     }
 
     /**
+     * Return the rpc address associated with an endpoint as a string.
+     * @param endpoint The endpoint to get rpc address for
+     * @return
+     */
+    public String getRpcaddress(InetAddress endpoint)
+    {
+        if (endpoint.equals(FBUtilities.getBroadcastAddress()))
+            return DatabaseDescriptor.getRpcAddress().getHostAddress();
+        else if (Gossiper.instance.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.RPC_ADDRESS) == null)
+            return endpoint.getHostAddress();
+        else
+            return Gossiper.instance.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.RPC_ADDRESS).value;
+    }
+
+    /**
      * for a keyspace, return the ranges and corresponding RPC addresses for a given keyspace.
      * @param keyspace
      * @return
      */
     public Map<Range, List<String>> getRangeToRpcaddressMap(String keyspace)
     {
-        // some people just want to get a visual representation of things. Allow null and set it to the first
-        // non-system table.
-        if (keyspace == null)
-            keyspace = Schema.instance.getNonSystemTables().get(0);
-
         /* All the ranges for the tokens */
         Map<Range, List<String>> map = new HashMap<Range, List<String>>();
         for (Map.Entry<Range,List<InetAddress>> entry : getRangeToAddressMap(keyspace).entrySet())
@@ -674,12 +678,7 @@ public class StorageService implements I
             List<String> rpcaddrs = new ArrayList<String>();
             for (InetAddress endpoint: entry.getValue())
             {
-                if (endpoint.equals(FBUtilities.getBroadcastAddress()))
-                    rpcaddrs.add(DatabaseDescriptor.getRpcAddress().getHostAddress());
-                else if (Gossiper.instance.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.RPC_ADDRESS) == null)
-                    rpcaddrs.add(endpoint.getHostAddress());
-                else
-                    rpcaddrs.add(Gossiper.instance.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.RPC_ADDRESS).value);
+                rpcaddrs.add(getRpcaddress(endpoint));
             }
             map.put(entry.getKey(), rpcaddrs);
         }
@@ -704,6 +703,11 @@ public class StorageService implements I
 
     public Map<Range, List<InetAddress>> getRangeToAddressMap(String keyspace)
     {
+        // some people just want to get a visual representation of things. Allow null and set it to the first
+        // non-system table.
+        if (keyspace == null)
+            keyspace = Schema.instance.getNonSystemTables().get(0);
+
         List<Range> ranges = getAllRanges(tokenMetadata_.sortedTokens());
         return constructRangeToEndpointMap(keyspace, ranges);
     }
@@ -1096,10 +1100,9 @@ public class StorageService implements I
         // all leaving nodes are gone.
         for (Range range : affectedRanges)
         {
-            Collection<InetAddress> currentEndpoints = strategy.calculateNaturalEndpoints(range.right, tm);
-            Collection<InetAddress> newEndpoints = strategy.calculateNaturalEndpoints(range.right, allLeftMetadata);
-            newEndpoints.removeAll(currentEndpoints);
-            pendingRanges.putAll(range, newEndpoints);
+            Set<InetAddress> currentEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, tm));
+            Set<InetAddress> newEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, allLeftMetadata));
+            pendingRanges.putAll(range, Sets.difference(newEndpoints, currentEndpoints));
         }
 
         // At this stage pendingRanges has been updated according to leave operations. We can
@@ -2086,8 +2089,9 @@ public class StorageService implements I
 
             for (Range toStream : rangesPerTable.left)
             {
-                List<InetAddress> endpoints = strategy.calculateNaturalEndpoints(toStream.right, tokenMetaClone);
-                rangeWithEndpoints.putAll(toStream, endpoints);
+                Set<InetAddress> currentEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(toStream.right, tokenMetadata_));
+                Set<InetAddress> newEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(toStream.right, tokenMetaClone));
+                rangeWithEndpoints.putAll(toStream, Sets.difference(newEndpoints, currentEndpoints));
             }
 
             // associating table with range-to-endpoints map

Modified: cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java?rev=1170350&r1=1170349&r2=1170350&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java Tue Sep 13 21:08:48 2011
@@ -798,6 +798,7 @@ public class CassandraServer implements 
         {
             Range range = entry.getKey();
             List<String> endpoints = new ArrayList<String>();
+            List<String> rpc_endpoints = new ArrayList<String>();
             List<EndpointDetails> epDetails = new ArrayList<EndpointDetails>();
 
             for (InetAddress endpoint : entry.getValue())
@@ -823,13 +824,16 @@ public class CassandraServer implements 
                     details.datacenter = appStateDc.value;
 
                 endpoints.add(details.host);
+                rpc_endpoints.add(StorageService.instance.getRpcaddress(endpoint));
 
                 if (details.port != -1 || details.datacenter != null)
                     epDetails.add(details);
             }
 
-            ranges.add(new TokenRange(tf.toString(range.left), tf.toString(range.right), endpoints)
-                                      .setEndpoint_details(epDetails));
+            TokenRange tr = new TokenRange(tf.toString(range.left), tf.toString(range.right), endpoints)
+                            .setEndpoint_details(epDetails)
+                            .setRpc_endpoints(rpc_endpoints);
+            ranges.add(tr);
         }
 
         return ranges;