You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/08/16 18:28:44 UTC
svn commit: r986017 - in /cassandra/trunk: ./
src/java/org/apache/cassandra/avro/ src/java/org/apache/cassandra/db/
src/java/org/apache/cassandra/locator/ src/java/org/apache/cassandra/service/
src/java/org/apache/cassandra/thrift/
Author: jbellis
Date: Mon Aug 16 16:28:43 2010
New Revision: 986017
URL: http://svn.apache.org/viewvc?rev=986017&view=rev
Log:
merge StorageProxy.mutate, mutateBlocking. patch by Sylvain Lebresne; reviewed by jbellis for CASSANDRA-1396
Added:
cassandra/trunk/src/java/org/apache/cassandra/service/IWriteResponseHandler.java
cassandra/trunk/src/java/org/apache/cassandra/service/NoConsistencyWriteResponseHandler.java
Modified:
cassandra/trunk/CHANGES.txt
cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java
cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
cassandra/trunk/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java
cassandra/trunk/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
cassandra/trunk/src/java/org/apache/cassandra/service/WriteResponseHandler.java
cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java
Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=986017&r1=986016&r2=986017&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Mon Aug 16 16:28:43 2010
@@ -10,6 +10,7 @@ dev
* rename RackAwareStrategy to OldNetworkTopologyStrategy, RackUnawareStrategy
to SimpleStrategy, DatacenterShardStrategy to NetworkTopologyStrategy,
AbstractRackAwareSnitch to AbstractNetworkTopologySnitch (CASSANDRA-1392)
+ * merge StorageProxy.mutate, mutateBlocking (CASSANDRA-1396)
0.7-beta1
Modified: cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java?rev=986017&r1=986016&r2=986017&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java Mon Aug 16 16:28:43 2010
@@ -421,37 +421,22 @@ public class CassandraServer implements
private void doInsert(ConsistencyLevel consistency, RowMutation rm) throws UnavailableException, TimedOutException
{
- if (consistency != ConsistencyLevel.ZERO)
+ try
{
- try
- {
- schedule();
- StorageProxy.mutateBlocking(Arrays.asList(rm), thriftConsistencyLevel(consistency));
- }
- catch (TimeoutException e)
- {
- throw new TimedOutException();
- }
- catch (org.apache.cassandra.thrift.UnavailableException thriftE)
- {
- throw new UnavailableException();
- }
- finally
- {
- release();
- }
+ schedule();
+ StorageProxy.mutate(Arrays.asList(rm), thriftConsistencyLevel(consistency));
}
- else
+ catch (TimeoutException e)
{
- try
- {
- schedule();
- StorageProxy.mutate(Arrays.asList(rm));
- }
- finally
- {
- release();
- }
+ throw new TimedOutException();
+ }
+ catch (org.apache.cassandra.thrift.UnavailableException thriftE)
+ {
+ throw new UnavailableException();
+ }
+ finally
+ {
+ release();
}
}
@@ -479,38 +464,23 @@ public class CassandraServer implements
rowMutations.add(getRowMutationFromMutations(curKeyspace.get(), pair.key.array(), cfToMutations));
}
- if (consistencyLevel == ConsistencyLevel.ZERO)
+ try
{
- try
- {
- schedule();
- StorageProxy.mutate(rowMutations);
- }
- finally
- {
- release();
- }
+ schedule();
+ StorageProxy.mutate(rowMutations, thriftConsistencyLevel(consistencyLevel));
}
- else
+ catch (TimeoutException te)
{
- try
- {
- schedule();
- StorageProxy.mutateBlocking(rowMutations, thriftConsistencyLevel(consistencyLevel));
- }
- catch (TimeoutException te)
- {
- throw newTimedOutException();
- }
- // FIXME: StorageProxy.mutateBlocking throws Thrift's UnavailableException
- catch (org.apache.cassandra.thrift.UnavailableException ue)
- {
- throw newUnavailableException();
- }
- finally
- {
- release();
- }
+ throw newTimedOutException();
+ }
+ // FIXME: StorageProxy.mutate throws Thrift's UnavailableException
+ catch (org.apache.cassandra.thrift.UnavailableException ue)
+ {
+ throw newUnavailableException();
+ }
+ finally
+ {
+ release();
}
return null;
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java?rev=986017&r1=986016&r2=986017&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java Mon Aug 16 16:28:43 2010
@@ -40,6 +40,7 @@ import org.apache.cassandra.net.Messagin
import org.apache.cassandra.service.DigestMismatchException;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.service.WriteResponseHandler;
+import org.apache.cassandra.service.IWriteResponseHandler;
import org.apache.cassandra.thrift.InvalidRequestException;
import org.apache.cassandra.utils.WrappedRunnable;
import org.cliffc.high_scale_lib.NonBlockingHashSet;
@@ -122,7 +123,7 @@ public class HintedHandOffManager
RowMutation rm = new RowMutation(tableName, key);
rm.add(cf);
Message message = rm.makeRowMutationMessage();
- WriteResponseHandler responseHandler = new WriteResponseHandler(endpoint);
+ IWriteResponseHandler responseHandler = WriteResponseHandler.create(endpoint);
MessagingService.instance.sendRR(message, new InetAddress[] { endpoint }, responseHandler);
try
{
Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java?rev=986017&r1=986016&r2=986017&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java Mon Aug 16 16:28:43 2010
@@ -104,11 +104,11 @@ public abstract class AbstractReplicatio
*/
public abstract Set<InetAddress> calculateNaturalEndpoints(Token searchToken, TokenMetadata tokenMetadata) throws IllegalStateException;
- public AbstractWriteResponseHandler getWriteResponseHandler(Collection<InetAddress> writeEndpoints,
- Multimap<InetAddress, InetAddress> hintedEndpoints,
- ConsistencyLevel consistencyLevel)
+ public IWriteResponseHandler getWriteResponseHandler(Collection<InetAddress> writeEndpoints,
+ Multimap<InetAddress, InetAddress> hintedEndpoints,
+ ConsistencyLevel consistencyLevel)
{
- return new WriteResponseHandler(writeEndpoints, hintedEndpoints, consistencyLevel, table);
+ return WriteResponseHandler.create(writeEndpoints, hintedEndpoints, consistencyLevel, table);
}
// instance method so test subclasses can override it
Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java?rev=986017&r1=986016&r2=986017&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java Mon Aug 16 16:28:43 2010
@@ -152,16 +152,16 @@ public class NetworkTopologyStrategy ext
* return a DCQRH with a map of all the DC rep factor.
*/
@Override
- public AbstractWriteResponseHandler getWriteResponseHandler(Collection<InetAddress> writeEndpoints, Multimap<InetAddress, InetAddress> hintedEndpoints, ConsistencyLevel consistency_level)
+ public IWriteResponseHandler getWriteResponseHandler(Collection<InetAddress> writeEndpoints, Multimap<InetAddress, InetAddress> hintedEndpoints, ConsistencyLevel consistency_level)
{
if (consistency_level == ConsistencyLevel.DCQUORUM)
{
// block for in this context will be localnodes block.
- return new DatacenterWriteResponseHandler(writeEndpoints, hintedEndpoints, consistency_level, table);
+ return DatacenterWriteResponseHandler.create(writeEndpoints, hintedEndpoints, consistency_level, table);
}
else if (consistency_level == ConsistencyLevel.DCQUORUMSYNC)
{
- return new DatacenterSyncWriteResponseHandler(writeEndpoints, hintedEndpoints, consistency_level, table);
+ return DatacenterSyncWriteResponseHandler.create(writeEndpoints, hintedEndpoints, consistency_level, table);
}
return super.getWriteResponseHandler(writeEndpoints, hintedEndpoints, consistency_level);
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java?rev=986017&r1=986016&r2=986017&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java Mon Aug 16 16:28:43 2010
@@ -28,13 +28,13 @@ import java.util.concurrent.TimeoutExcep
import com.google.common.collect.Multimap;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.net.IAsyncCallback;
import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.thrift.ConsistencyLevel;
import org.apache.cassandra.thrift.UnavailableException;
import org.apache.cassandra.utils.SimpleCondition;
-public abstract class AbstractWriteResponseHandler implements IAsyncCallback
+public abstract class AbstractWriteResponseHandler implements IWriteResponseHandler
{
protected final SimpleCondition condition = new SimpleCondition();
protected final long startTime;
@@ -42,7 +42,7 @@ public abstract class AbstractWriteRespo
protected final Multimap<InetAddress, InetAddress> hintedEndpoints;
protected final ConsistencyLevel consistencyLevel;
- public AbstractWriteResponseHandler(Collection<InetAddress> writeEndpoints, Multimap<InetAddress, InetAddress> hintedEndpoints, ConsistencyLevel consistencyLevel)
+ protected AbstractWriteResponseHandler(Collection<InetAddress> writeEndpoints, Multimap<InetAddress, InetAddress> hintedEndpoints, ConsistencyLevel consistencyLevel)
{
startTime = System.currentTimeMillis();
this.consistencyLevel = consistencyLevel;
@@ -69,6 +69,13 @@ public abstract class AbstractWriteRespo
}
}
+ public void addHintCallback(Message hintedMessage, InetAddress destination)
+ {
+ // (non-destination hints are part of the callback and count towards consistency only under CL.ANY)
+ if (writeEndpoints.contains(destination) || consistencyLevel == ConsistencyLevel.ANY)
+ MessagingService.instance.addCallback(this, hintedMessage.getMessageId());
+ }
+
/** null message means "response from local write" */
public abstract void response(Message msg);
Modified: cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java?rev=986017&r1=986016&r2=986017&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java Mon Aug 16 16:28:43 2010
@@ -56,7 +56,7 @@ public class DatacenterSyncWriteResponse
private final NetworkTopologyStrategy strategy;
private HashMap<String, AtomicInteger> responses = new HashMap<String, AtomicInteger>();
- public DatacenterSyncWriteResponseHandler(Collection<InetAddress> writeEndpoints, Multimap<InetAddress, InetAddress> hintedEndpoints, ConsistencyLevel consistencyLevel, String table)
+ protected DatacenterSyncWriteResponseHandler(Collection<InetAddress> writeEndpoints, Multimap<InetAddress, InetAddress> hintedEndpoints, ConsistencyLevel consistencyLevel, String table)
{
// Response is been managed by the map so make it 1 for the superclass.
super(writeEndpoints, hintedEndpoints, consistencyLevel);
@@ -71,6 +71,18 @@ public class DatacenterSyncWriteResponse
}
}
+ public static IWriteResponseHandler create(Collection<InetAddress> writeEndpoints, Multimap<InetAddress, InetAddress> hintedEndpoints, ConsistencyLevel consistencyLevel, String table)
+ {
+ if (consistencyLevel == ConsistencyLevel.ZERO)
+ {
+ return NoConsistencyWriteResponseHandler.instance;
+ }
+ else
+ {
+ return new DatacenterSyncWriteResponseHandler(writeEndpoints, hintedEndpoints, consistencyLevel, table);
+ }
+ }
+
public void response(Message message)
{
String dataCenter = message == null
Modified: cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java?rev=986017&r1=986016&r2=986017&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java Mon Aug 16 16:28:43 2010
@@ -50,12 +50,23 @@ public class DatacenterWriteResponseHand
localdc = snitch.getDatacenter(FBUtilities.getLocalAddress());
}
- public DatacenterWriteResponseHandler(Collection<InetAddress> writeEndpoints, Multimap<InetAddress, InetAddress> hintedEndpoints, ConsistencyLevel consistencyLevel, String table)
+ protected DatacenterWriteResponseHandler(Collection<InetAddress> writeEndpoints, Multimap<InetAddress, InetAddress> hintedEndpoints, ConsistencyLevel consistencyLevel, String table)
{
super(writeEndpoints, hintedEndpoints, consistencyLevel, table);
assert consistencyLevel == ConsistencyLevel.DCQUORUM;
}
+ public static IWriteResponseHandler create(Collection<InetAddress> writeEndpoints, Multimap<InetAddress, InetAddress> hintedEndpoints, ConsistencyLevel consistencyLevel, String table)
+ {
+ if (consistencyLevel == ConsistencyLevel.ZERO)
+ {
+ return NoConsistencyWriteResponseHandler.instance;
+ }
+ else
+ {
+ return new DatacenterWriteResponseHandler(writeEndpoints, hintedEndpoints, consistencyLevel, table);
+ }
+ }
@Override
protected int determineBlockFor(String table)
Added: cassandra/trunk/src/java/org/apache/cassandra/service/IWriteResponseHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/IWriteResponseHandler.java?rev=986017&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/IWriteResponseHandler.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/IWriteResponseHandler.java Mon Aug 16 16:28:43 2010
@@ -0,0 +1,35 @@
+package org.apache.cassandra.service;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+import java.net.InetAddress;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.cassandra.net.IAsyncCallback;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.thrift.UnavailableException;
+
+public interface IWriteResponseHandler extends IAsyncCallback
+{
+ public void get() throws TimeoutException;
+ public void addHintCallback(Message hintedMessage, InetAddress destination);
+ public void assureSufficientLiveNodes() throws UnavailableException;
+}
Added: cassandra/trunk/src/java/org/apache/cassandra/service/NoConsistencyWriteResponseHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/NoConsistencyWriteResponseHandler.java?rev=986017&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/NoConsistencyWriteResponseHandler.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/NoConsistencyWriteResponseHandler.java Mon Aug 16 16:28:43 2010
@@ -0,0 +1,20 @@
+package org.apache.cassandra.service;
+
+import java.net.InetAddress;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.thrift.UnavailableException;
+
+class NoConsistencyWriteResponseHandler implements IWriteResponseHandler
+{
+ static final IWriteResponseHandler instance = new NoConsistencyWriteResponseHandler();
+
+ public void get() throws TimeoutException {}
+
+ public void addHintCallback(Message hintedMessage, InetAddress destination) {}
+
+ public void response(Message msg) {}
+
+ public void assureSufficientLiveNodes() throws UnavailableException {}
+}
Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=986017&r1=986016&r2=986017&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Mon Aug 16 16:28:43 2010
@@ -84,100 +84,13 @@ public class StorageProxy implements Sto
* of the possibility of a replica being down and hint
* the data across to some other replica.
*
- * This is the ZERO consistency level. We do not wait for replies.
- *
* @param mutations the mutations to be applied across the replicas
+ * @param consistency_level the consistency level for the operation
*/
- public static void mutate(List<RowMutation> mutations)
+ public static void mutate(List<RowMutation> mutations, ConsistencyLevel consistency_level) throws UnavailableException, TimeoutException
{
long startTime = System.nanoTime();
- try
- {
- StorageService ss = StorageService.instance;
- for (final RowMutation rm: mutations)
- {
- try
- {
- String table = rm.getTable();
- AbstractReplicationStrategy rs = ss.getReplicationStrategy(table);
-
- List<InetAddress> naturalEndpoints = ss.getNaturalEndpoints(table, rm.key());
- Multimap<InetAddress,InetAddress> hintedEndpoints = rs.getHintedEndpoints(naturalEndpoints);
- Message unhintedMessage = null; // lazy initialize for non-local, unhinted writes
-
- // 3 cases:
- // 1. local, unhinted write: run directly on write stage
- // 2. non-local, unhinted write: send row mutation message
- // 3. hinted write: add hint header, and send message
- for (Map.Entry<InetAddress, Collection<InetAddress>> entry : hintedEndpoints.asMap().entrySet())
- {
- InetAddress destination = entry.getKey();
- Collection<InetAddress> targets = entry.getValue();
- if (targets.size() == 1 && targets.iterator().next().equals(destination))
- {
- // unhinted writes
- if (destination.equals(FBUtilities.getLocalAddress()))
- {
- if (logger.isDebugEnabled())
- logger.debug("insert writing local " + rm.toString(true));
- Runnable runnable = new WrappedRunnable()
- {
- public void runMayThrow() throws IOException
- {
- rm.apply();
- }
- };
- StageManager.getStage(StageManager.MUTATION_STAGE).execute(runnable);
- }
- else
- {
- if (unhintedMessage == null)
- unhintedMessage = rm.makeRowMutationMessage();
- if (logger.isDebugEnabled())
- logger.debug("insert writing key " + FBUtilities.bytesToHex(rm.key()) + " to " + unhintedMessage.getMessageId() + "@" + destination);
- MessagingService.instance.sendOneWay(unhintedMessage, destination);
- }
- }
- else
- {
- // hinted
- Message hintedMessage = rm.makeRowMutationMessage();
- for (InetAddress target : targets)
- {
- if (!target.equals(destination))
- {
- addHintHeader(hintedMessage, target);
- if (logger.isDebugEnabled())
- logger.debug("insert writing key " + FBUtilities.bytesToHex(rm.key()) + " to " + hintedMessage.getMessageId() + "@" + destination + " for " + target);
- }
- }
- MessagingService.instance.sendOneWay(hintedMessage, destination);
- }
- }
- }
- catch (IOException e)
- {
- throw new RuntimeException("error inserting key " + FBUtilities.bytesToHex(rm.key()), e);
- }
- }
- }
- finally
- {
- writeStats.addNano(System.nanoTime() - startTime);
- }
- }
-
- private static void addHintHeader(Message message, InetAddress target)
- {
- byte[] oldHint = message.getHeader(RowMutation.HINT);
- byte[] hint = oldHint == null ? target.getAddress() : ArrayUtils.addAll(oldHint, target.getAddress());
- message.setHeader(RowMutation.HINT, hint);
- }
-
- public static void mutateBlocking(List<RowMutation> mutations, ConsistencyLevel consistency_level) throws UnavailableException, TimeoutException
- {
- long startTime = System.nanoTime();
- ArrayList<AbstractWriteResponseHandler> responseHandlers = new ArrayList<AbstractWriteResponseHandler>();
+ ArrayList<IWriteResponseHandler> responseHandlers = new ArrayList<IWriteResponseHandler>();
RowMutation mostRecentRowMutation = null;
StorageService ss = StorageService.instance;
@@ -195,7 +108,7 @@ public class StorageProxy implements Sto
Multimap<InetAddress, InetAddress> hintedEndpoints = rs.getHintedEndpoints(writeEndpoints);
// send out the writes, as in mutate() above, but this time with a callback that tracks responses
- final AbstractWriteResponseHandler responseHandler = rs.getWriteResponseHandler(writeEndpoints, hintedEndpoints, consistency_level);
+ final IWriteResponseHandler responseHandler = rs.getWriteResponseHandler(writeEndpoints, hintedEndpoints, consistency_level);
responseHandler.assureSufficientLiveNodes();
responseHandlers.add(responseHandler);
@@ -238,15 +151,13 @@ public class StorageProxy implements Sto
logger.debug("insert writing key " + FBUtilities.bytesToHex(rm.key()) + " to " + hintedMessage.getMessageId() + "@" + destination + " for " + target);
}
}
- // (non-destination hints are part of the callback and count towards consistency only under CL.ANY)
- if (writeEndpoints.contains(destination) || consistency_level == ConsistencyLevel.ANY)
- MessagingService.instance.addCallback(responseHandler, hintedMessage.getMessageId());
+ responseHandler.addHintCallback(hintedMessage, destination);
MessagingService.instance.sendOneWay(hintedMessage, destination);
}
}
}
// wait for writes. throws timeoutexception if necessary
- for (AbstractWriteResponseHandler responseHandler : responseHandlers)
+ for (IWriteResponseHandler responseHandler : responseHandlers)
{
responseHandler.get();
}
@@ -265,7 +176,14 @@ public class StorageProxy implements Sto
}
- private static void insertLocalMessage(final RowMutation rm, final AbstractWriteResponseHandler responseHandler)
+ private static void addHintHeader(Message message, InetAddress target)
+ {
+ byte[] oldHint = message.getHeader(RowMutation.HINT);
+ byte[] hint = oldHint == null ? target.getAddress() : ArrayUtils.addAll(oldHint, target.getAddress());
+ message.setHeader(RowMutation.HINT, hint);
+ }
+
+ private static void insertLocalMessage(final RowMutation rm, final IWriteResponseHandler responseHandler)
{
if (logger.isDebugEnabled())
logger.debug("insert writing local " + rm.toString(true));
Modified: cassandra/trunk/src/java/org/apache/cassandra/service/WriteResponseHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/WriteResponseHandler.java?rev=986017&r1=986016&r2=986017&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/WriteResponseHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/WriteResponseHandler.java Mon Aug 16 16:28:43 2010
@@ -39,15 +39,15 @@ public class WriteResponseHandler extend
{
protected static final Logger logger = LoggerFactory.getLogger(WriteResponseHandler.class);
- protected AtomicInteger responses;
+ protected final AtomicInteger responses;
- public WriteResponseHandler(Collection<InetAddress> writeEndpoints, Multimap<InetAddress, InetAddress> hintedEndpoints, ConsistencyLevel consistencyLevel, String table)
+ protected WriteResponseHandler(Collection<InetAddress> writeEndpoints, Multimap<InetAddress, InetAddress> hintedEndpoints, ConsistencyLevel consistencyLevel, String table)
{
super(writeEndpoints, hintedEndpoints, consistencyLevel);
responses = new AtomicInteger(determineBlockFor(table));
}
- public WriteResponseHandler(InetAddress endpoint)
+ protected WriteResponseHandler(InetAddress endpoint)
{
super(Arrays.asList(endpoint),
ImmutableMultimap.<InetAddress, InetAddress>builder().put(endpoint, endpoint).build(),
@@ -55,6 +55,23 @@ public class WriteResponseHandler extend
responses = new AtomicInteger(1);
}
+ public static IWriteResponseHandler create(Collection<InetAddress> writeEndpoints, Multimap<InetAddress, InetAddress> hintedEndpoints, ConsistencyLevel consistencyLevel, String table)
+ {
+ if (consistencyLevel == ConsistencyLevel.ZERO)
+ {
+ return NoConsistencyWriteResponseHandler.instance;
+ }
+ else
+ {
+ return new WriteResponseHandler(writeEndpoints, hintedEndpoints, consistencyLevel, table);
+ }
+ }
+
+ public static IWriteResponseHandler create(InetAddress endpoint)
+ {
+ return new WriteResponseHandler(endpoint);
+ }
+
public void response(Message m)
{
if (responses.decrementAndGet() == 0)
Modified: cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java?rev=986017&r1=986016&r2=986017&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java Mon Aug 16 16:28:43 2010
@@ -463,20 +463,13 @@ public class CassandraServer implements
{
schedule();
- if (consistency_level == ConsistencyLevel.ZERO)
+ try
{
- StorageProxy.mutate(mutations);
+ StorageProxy.mutate(mutations, consistency_level);
}
- else
+ catch (TimeoutException e)
{
- try
- {
- StorageProxy.mutateBlocking(mutations, consistency_level);
- }
- catch (TimeoutException e)
- {
- throw new TimedOutException();
- }
+ throw new TimedOutException();
}
}
finally