You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/09/13 23:08:49 UTC
svn commit: r1170350 [2/2] - in /cassandra/trunk: ./ conf/ contrib/
interface/ interface/thrift/gen-java/org/apache/cassandra/thrift/
src/java/org/apache/cassandra/db/compaction/
src/java/org/apache/cassandra/dht/ src/java/org/apache/cassandra/hadoop/ ...
Modified: cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java?rev=1170350&r1=1170349&r2=1170350&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java Tue Sep 13 21:08:48 2011
@@ -165,7 +165,12 @@ public class BootStrapper
}
if (endpoints.isEmpty())
- throw new RuntimeException("No other nodes seen! Unable to bootstrap");
+ throw new RuntimeException("No other nodes seen! Unable to bootstrap."
+ + "If you intended to start a single-node cluster, you should make sure "
+ + "your broadcast_address (or listen_address) is listed as a seed. "
+ + "Otherwise, you need to determine why the seed being contacted "
+ + "has no knowledge of the rest of the cluster. Usually, this can be solved "
+ + "by giving all nodes the same seed list.");
Collections.sort(endpoints, new Comparator<InetAddress>()
{
public int compare(InetAddress ia1, InetAddress ia2)
Modified: cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java?rev=1170350&r1=1170349&r2=1170350&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java Tue Sep 13 21:08:48 2011
@@ -143,6 +143,8 @@ public class ColumnFamilyRecordReader ex
return true;
}
+ // we don't use endpointsnitch since we are trying to support hadoop nodes that are
+ // not necessarily on Cassandra machines, too. This should be adequate for single-DC clusters, at least.
private String getLocation()
{
InetAddress[] localAddresses;
@@ -173,7 +175,7 @@ public class ColumnFamilyRecordReader ex
}
}
}
- throw new UnsupportedOperationException("no local connection available");
+ return split.getLocations()[0];
}
private class RowIterator extends AbstractIterator<Pair<ByteBuffer, SortedMap<ByteBuffer, IColumn>>>
Modified: cassandra/trunk/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java?rev=1170350&r1=1170349&r2=1170350&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java Tue Sep 13 21:08:48 2011
@@ -25,6 +25,8 @@ import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
public class MappedFileDataInput extends AbstractDataInput implements FileDataInput
{
private final MappedByteBuffer buffer;
@@ -117,11 +119,18 @@ public class MappedFileDataInput extends
throw new IOException(String.format("mmap segment underflow; remaining is %d but %d requested",
remaining, length));
+ if (length == 0)
+ return ByteBufferUtil.EMPTY_BYTE_BUFFER;
+
ByteBuffer bytes = buffer.duplicate();
bytes.position(buffer.position() + position).limit(buffer.position() + position + length);
position += length;
- return bytes;
+ // we have to copy the data in case we unreference the underlying sstable. See CASSANDRA-3179
+ ByteBuffer clone = ByteBuffer.allocate(bytes.remaining());
+ clone.put(bytes);
+ clone.flip();
+ return clone;
}
@Override
Modified: cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java?rev=1170350&r1=1170349&r2=1170350&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java Tue Sep 13 21:08:48 2011
@@ -67,6 +67,8 @@ public class IncomingTcpConnection exten
int header = input.readInt();
isStream = MessagingService.getBits(header, 3, 1) == 1;
version = MessagingService.getBits(header, 15, 8);
+ if (logger.isDebugEnabled())
+ logger.debug("Version for " + socket.getInetAddress() + " is " + version);
if (isStream)
{
if (version == MessagingService.version_)
@@ -98,6 +100,7 @@ public class IncomingTcpConnection exten
else if (msg != null)
{
Gossiper.instance.setVersion(msg.getFrom(), version);
+ logger.debug("set version for {} to {}", socket.getInetAddress(), version);
}
// loop to get the next message.
@@ -108,6 +111,7 @@ public class IncomingTcpConnection exten
header = input.readInt();
assert isStream == (MessagingService.getBits(header, 3, 1) == 1) : "Connections cannot change type: " + isStream;
version = MessagingService.getBits(header, 15, 8);
+ logger.debug("Version is now {}", version);
receiveMessage(input, version);
}
}
@@ -153,7 +157,7 @@ public class IncomingTcpConnection exten
MessagingService.instance().receive(message, id);
return message;
}
- logger.info("Received connection from newer protocol version. Ignorning message.");
+ logger.debug("Received connection from newer protocol version {}. Ignorning message", version);
return null;
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java?rev=1170350&r1=1170349&r2=1170350&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java Tue Sep 13 21:08:48 2011
@@ -346,15 +346,15 @@ public abstract class AbstractCassandraD
}
start();
- } catch (Throwable e)
+ }
+ catch (Throwable e)
{
- String msg = "Exception encountered during startup.";
- logger.error(msg, e);
+ logger.error("Exception encountered during startup", e);
// try to warn user on stdout too, if we haven't already detached
- System.out.println(msg);
e.printStackTrace();
-
+ System.out.println("Exception encountered during startup: " + e.getMessage());
+
System.exit(3);
}
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/service/GCInspector.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/GCInspector.java?rev=1170350&r1=1170349&r2=1170350&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/GCInspector.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/GCInspector.java Tue Sep 13 21:08:48 2011
@@ -107,7 +107,7 @@ public class GCInspector
if (previousCount == null)
previousCount = 0L;
- if (count == previousCount)
+ if (count.equals(previousCount))
continue;
gccounts.put(gc.getName(), count);
Modified: cassandra/trunk/src/java/org/apache/cassandra/service/RowRepairResolver.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/RowRepairResolver.java?rev=1170350&r1=1170349&r2=1170350&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/RowRepairResolver.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/RowRepairResolver.java Tue Sep 13 21:08:48 2011
@@ -79,7 +79,7 @@ public class RowRepairResolver extends A
endpoints.add(message.getFrom());
// compute maxLiveColumns to prevent short reads -- see https://issues.apache.org/jira/browse/CASSANDRA-2643
- int liveColumns = cf.getLiveColumnCount();
+ int liveColumns = cf == null ? 0 : cf.getLiveColumnCount();
if (liveColumns > maxLiveColumns)
maxLiveColumns = liveColumns;
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=1170350&r1=1170349&r2=1170350&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Tue Sep 13 21:08:48 2011
@@ -31,9 +31,7 @@ import javax.lang.model.type.TypeKind;
import javax.management.MBeanServer;
import javax.management.ObjectName;
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Multimap;
+import com.google.common.collect.*;
import org.apache.cassandra.config.*;
import org.apache.log4j.Level;
import org.apache.commons.lang.StringUtils;
@@ -44,6 +42,7 @@ import org.apache.cassandra.concurrent.D
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.Table;
import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.dht.*;
import org.apache.cassandra.gms.*;
@@ -641,11 +640,6 @@ public class StorageService implements I
*/
public Map<Range, List<String>> getRangeToEndpointMap(String keyspace)
{
- // some people just want to get a visual representation of things. Allow null and set it to the first
- // non-system table.
- if (keyspace == null)
- keyspace = Schema.instance.getNonSystemTables().get(0);
-
/* All the ranges for the tokens */
Map<Range, List<String>> map = new HashMap<Range, List<String>>();
for (Map.Entry<Range,List<InetAddress>> entry : getRangeToAddressMap(keyspace).entrySet())
@@ -656,17 +650,27 @@ public class StorageService implements I
}
/**
+ * Return the rpc address associated with an endpoint as a string.
+ * @param endpoint The endpoint to get rpc address for
+ * @return
+ */
+ public String getRpcaddress(InetAddress endpoint)
+ {
+ if (endpoint.equals(FBUtilities.getBroadcastAddress()))
+ return DatabaseDescriptor.getRpcAddress().getHostAddress();
+ else if (Gossiper.instance.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.RPC_ADDRESS) == null)
+ return endpoint.getHostAddress();
+ else
+ return Gossiper.instance.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.RPC_ADDRESS).value;
+ }
+
+ /**
* for a keyspace, return the ranges and corresponding RPC addresses for a given keyspace.
* @param keyspace
* @return
*/
public Map<Range, List<String>> getRangeToRpcaddressMap(String keyspace)
{
- // some people just want to get a visual representation of things. Allow null and set it to the first
- // non-system table.
- if (keyspace == null)
- keyspace = Schema.instance.getNonSystemTables().get(0);
-
/* All the ranges for the tokens */
Map<Range, List<String>> map = new HashMap<Range, List<String>>();
for (Map.Entry<Range,List<InetAddress>> entry : getRangeToAddressMap(keyspace).entrySet())
@@ -674,12 +678,7 @@ public class StorageService implements I
List<String> rpcaddrs = new ArrayList<String>();
for (InetAddress endpoint: entry.getValue())
{
- if (endpoint.equals(FBUtilities.getBroadcastAddress()))
- rpcaddrs.add(DatabaseDescriptor.getRpcAddress().getHostAddress());
- else if (Gossiper.instance.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.RPC_ADDRESS) == null)
- rpcaddrs.add(endpoint.getHostAddress());
- else
- rpcaddrs.add(Gossiper.instance.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.RPC_ADDRESS).value);
+ rpcaddrs.add(getRpcaddress(endpoint));
}
map.put(entry.getKey(), rpcaddrs);
}
@@ -704,6 +703,11 @@ public class StorageService implements I
public Map<Range, List<InetAddress>> getRangeToAddressMap(String keyspace)
{
+ // some people just want to get a visual representation of things. Allow null and set it to the first
+ // non-system table.
+ if (keyspace == null)
+ keyspace = Schema.instance.getNonSystemTables().get(0);
+
List<Range> ranges = getAllRanges(tokenMetadata_.sortedTokens());
return constructRangeToEndpointMap(keyspace, ranges);
}
@@ -1096,10 +1100,9 @@ public class StorageService implements I
// all leaving nodes are gone.
for (Range range : affectedRanges)
{
- Collection<InetAddress> currentEndpoints = strategy.calculateNaturalEndpoints(range.right, tm);
- Collection<InetAddress> newEndpoints = strategy.calculateNaturalEndpoints(range.right, allLeftMetadata);
- newEndpoints.removeAll(currentEndpoints);
- pendingRanges.putAll(range, newEndpoints);
+ Set<InetAddress> currentEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, tm));
+ Set<InetAddress> newEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, allLeftMetadata));
+ pendingRanges.putAll(range, Sets.difference(newEndpoints, currentEndpoints));
}
// At this stage pendingRanges has been updated according to leave operations. We can
@@ -2086,8 +2089,9 @@ public class StorageService implements I
for (Range toStream : rangesPerTable.left)
{
- List<InetAddress> endpoints = strategy.calculateNaturalEndpoints(toStream.right, tokenMetaClone);
- rangeWithEndpoints.putAll(toStream, endpoints);
+ Set<InetAddress> currentEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(toStream.right, tokenMetadata_));
+ Set<InetAddress> newEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(toStream.right, tokenMetaClone));
+ rangeWithEndpoints.putAll(toStream, Sets.difference(newEndpoints, currentEndpoints));
}
// associating table with range-to-endpoints map
Modified: cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java?rev=1170350&r1=1170349&r2=1170350&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java Tue Sep 13 21:08:48 2011
@@ -798,6 +798,7 @@ public class CassandraServer implements
{
Range range = entry.getKey();
List<String> endpoints = new ArrayList<String>();
+ List<String> rpc_endpoints = new ArrayList<String>();
List<EndpointDetails> epDetails = new ArrayList<EndpointDetails>();
for (InetAddress endpoint : entry.getValue())
@@ -823,13 +824,16 @@ public class CassandraServer implements
details.datacenter = appStateDc.value;
endpoints.add(details.host);
+ rpc_endpoints.add(StorageService.instance.getRpcaddress(endpoint));
if (details.port != -1 || details.datacenter != null)
epDetails.add(details);
}
- ranges.add(new TokenRange(tf.toString(range.left), tf.toString(range.right), endpoints)
- .setEndpoint_details(epDetails));
+ TokenRange tr = new TokenRange(tf.toString(range.left), tf.toString(range.right), endpoints)
+ .setEndpoint_details(epDetails)
+ .setRpc_endpoints(rpc_endpoints);
+ ranges.add(tr);
}
return ranges;