You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sn...@apache.org on 2015/05/20 10:43:56 UTC
[2/3] cassandra git commit: Revert CASSANDRA-7807 (tracing completion
client notifications) (CASSANDRA-9429)
Revert CASSANDRA-7807 (tracing completion client notifications) (CASSANDRA-9429)
patch by Robert Stupp; reviewed by Aleksey Yeschenko for CASSANDRA-9429
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e4eba253
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e4eba253
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e4eba253
Branch: refs/heads/trunk
Commit: e4eba2538255fdc23cad59642be69c8b27f04218
Parents: 6eea3ea
Author: Robert Stupp <sn...@snazy.de>
Authored: Wed May 20 08:48:53 2015 +0200
Committer: Robert Stupp <sn...@snazy.de>
Committed: Wed May 20 08:48:53 2015 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
doc/native_protocol_v4.spec | 6 -
.../apache/cassandra/service/QueryState.java | 10 +-
.../apache/cassandra/tracing/TraceState.java | 42 +---
.../org/apache/cassandra/tracing/Tracing.java | 24 +--
.../apache/cassandra/transport/Connection.java | 9 -
.../org/apache/cassandra/transport/Event.java | 54 -----
.../org/apache/cassandra/transport/Server.java | 5 -
.../transport/messages/BatchMessage.java | 2 +-
.../transport/messages/ExecuteMessage.java | 2 +-
.../transport/messages/PrepareMessage.java | 2 +-
.../transport/messages/QueryMessage.java | 2 +-
.../cassandra/tracing/TraceCompleteTest.java | 204 -------------------
13 files changed, 19 insertions(+), 344 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e4eba253/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 4e79ea0..a227f5e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
2.2
* Ensure that UDF and UDAs are keyspace-isolated (CASSANDRA-9409)
+ * Revert CASSANDRA-7807 (tracing completion client notifications) (CASSANDRA-9429)
Merged from 2.1:
* Use configured gcgs in anticompaction (CASSANDRA-9397)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e4eba253/doc/native_protocol_v4.spec
----------------------------------------------------------------------
diff --git a/doc/native_protocol_v4.spec b/doc/native_protocol_v4.spec
index ba3d3b3..f040323 100644
--- a/doc/native_protocol_v4.spec
+++ b/doc/native_protocol_v4.spec
@@ -130,8 +130,6 @@ Table of Contents
ignored by the default QueryHandler implementation.
Currently, only QUERY, PREPARE, EXECUTE and BATCH requests support
payload.
- If both trace-flag and payload-flag are set, the generic key-value
- payload appears after trace's data.
Type of custom payload is [bytes map] (see below).
0x08: Warning flag. The response contains warnings from the server which
were generated by the server to go along with this response.
@@ -755,9 +753,6 @@ Table of Contents
- [string] keyspace containing the user defined function / aggregate
- [string] the function/aggregate name
- [string list] one string for each argument type (as CQL type)
- - "TRACE_COMPLETE": notification that a trace session has completed at least
- on the coordinator. After the event type, the rest of the message will
- contain the trace session-ID [uuid] as the only argument.
All EVENT messages have a streamId of -1 (Section 2.3).
@@ -1162,7 +1157,6 @@ Table of Contents
* Read_failure error code was added.
* Function_failure error code was added.
* Add custom payload to frames for custom QueryHandler implementations (ignored by Cassandra's standard QueryHandler)
- * Add "TRACE_COMPLETE" event (section 4.2.6).
* Add warnings to frames for responses for which the server generated a warning during processing, which the client needs to address.
* Add the date and time data types
* Add the tinyint and smallint data types
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e4eba253/src/java/org/apache/cassandra/service/QueryState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/QueryState.java b/src/java/org/apache/cassandra/service/QueryState.java
index 5e89ac8..ddbc959 100644
--- a/src/java/org/apache/cassandra/service/QueryState.java
+++ b/src/java/org/apache/cassandra/service/QueryState.java
@@ -22,7 +22,6 @@ import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.cassandra.tracing.Tracing;
-import org.apache.cassandra.transport.Connection;
/**
* Represents the state related to a given query.
@@ -77,19 +76,14 @@ public class QueryState
public void createTracingSession()
{
- createTracingSession(null);
- }
-
- public void createTracingSession(Connection connection)
- {
UUID session = this.preparedTracingSession;
if (session == null)
{
- Tracing.instance.newSession(connection);
+ Tracing.instance.newSession();
}
else
{
- Tracing.instance.newSession(connection, session);
+ Tracing.instance.newSession(session);
this.preparedTracingSession = null;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e4eba253/src/java/org/apache/cassandra/tracing/TraceState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tracing/TraceState.java b/src/java/org/apache/cassandra/tracing/TraceState.java
index c029ac7..e882e67 100644
--- a/src/java/org/apache/cassandra/tracing/TraceState.java
+++ b/src/java/org/apache/cassandra/tracing/TraceState.java
@@ -35,8 +35,6 @@ import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.exceptions.OverloadedException;
import org.apache.cassandra.service.StorageProxy;
-import org.apache.cassandra.transport.Connection;
-import org.apache.cassandra.transport.Event;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.WrappedRunnable;
import org.apache.cassandra.utils.progress.ProgressEvent;
@@ -60,10 +58,6 @@ public class TraceState implements ProgressEventNotifier
private final List<ProgressListener> listeners = new CopyOnWriteArrayList<>();
private String tag;
- private final boolean withFinishEvent;
- private final AtomicInteger pendingMutations = new AtomicInteger();
- private final Connection connection;
-
public enum Status
{
IDLE,
@@ -79,24 +73,17 @@ public class TraceState implements ProgressEventNotifier
public TraceState(InetAddress coordinator, UUID sessionId, Tracing.TraceType traceType)
{
- this(coordinator, null, sessionId, traceType, false);
- }
-
- public TraceState(InetAddress coordinator, Connection connection, UUID sessionId, Tracing.TraceType traceType, boolean withFinishEvent)
- {
assert coordinator != null;
assert sessionId != null;
this.coordinator = coordinator;
- this.connection = connection;
this.sessionId = sessionId;
sessionIdBytes = ByteBufferUtil.bytes(sessionId);
this.traceType = traceType;
this.ttl = traceType.getTTL();
watch = Stopwatch.createStarted();
this.status = Status.IDLE;
- this.withFinishEvent = withFinishEvent;
- }
+}
/**
* Activate notification with provided {@code tag} name.
@@ -134,19 +121,6 @@ public class TraceState implements ProgressEventNotifier
{
status = Status.STOPPED;
notifyAll();
- pushEventIfStopped();
- }
-
- private void pushEventIfStopped()
- {
- if (status == Status.STOPPED && pendingMutations.get() == 0)
- {
- // poor-man's prevention of duplicate tracing-finished events
- pendingMutations.set(Integer.MIN_VALUE);
-
- if (connection != null && withFinishEvent)
- connection.sendIfRegistered(new Event.TraceComplete(sessionId));
- }
}
/*
@@ -214,23 +188,13 @@ public class TraceState implements ProgressEventNotifier
}
}
- void executeMutation(final Mutation mutation)
+ static void executeMutation(final Mutation mutation)
{
- pendingMutations.incrementAndGet();
-
StageManager.getStage(Stage.TRACING).execute(new WrappedRunnable()
{
protected void runMayThrow() throws Exception
{
- try
- {
- mutateWithCatch(mutation);
- }
- finally
- {
- if (pendingMutations.decrementAndGet() == 0)
- pushEventIfStopped();
- }
+ mutateWithCatch(mutation);
}
});
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e4eba253/src/java/org/apache/cassandra/tracing/Tracing.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tracing/Tracing.java b/src/java/org/apache/cassandra/tracing/Tracing.java
index 0e49cd0..3f9f54d 100644
--- a/src/java/org/apache/cassandra/tracing/Tracing.java
+++ b/src/java/org/apache/cassandra/tracing/Tracing.java
@@ -33,7 +33,6 @@ import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.marshal.TimeUUIDType;
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.transport.Connection;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.UUIDGen;
@@ -113,31 +112,26 @@ public class Tracing
return instance.state.get() != null;
}
- public UUID newSession(Connection connection)
+ public UUID newSession()
{
- return newSession(connection, TraceType.QUERY);
+ return newSession(TraceType.QUERY);
}
public UUID newSession(TraceType traceType)
{
- return newSession(null, traceType);
+ return newSession(TimeUUIDType.instance.compose(ByteBuffer.wrap(UUIDGen.getTimeUUIDBytes())), traceType);
}
- public UUID newSession(Connection connection, TraceType traceType)
+ public UUID newSession(UUID sessionId)
{
- return newSession(connection, TimeUUIDType.instance.compose(ByteBuffer.wrap(UUIDGen.getTimeUUIDBytes())), traceType, false);
+ return newSession(sessionId, TraceType.QUERY);
}
- public UUID newSession(Connection connection, UUID sessionId)
- {
- return newSession(connection, sessionId, TraceType.QUERY, true);
- }
-
- private UUID newSession(Connection connection, UUID sessionId, TraceType traceType, boolean withFinishEvent)
+ private UUID newSession(UUID sessionId, TraceType traceType)
{
assert state.get() == null;
- TraceState ts = new TraceState(localAddress, connection, sessionId, traceType, withFinishEvent);
+ TraceState ts = new TraceState(localAddress, sessionId, traceType);
state.set(ts);
sessions.put(sessionId, ts);
@@ -166,7 +160,7 @@ public class Tracing
final ByteBuffer sessionId = state.sessionIdBytes;
final int ttl = state.ttl;
- state.executeMutation(TraceKeyspace.makeStopSessionMutation(sessionId, elapsed, ttl));
+ TraceState.executeMutation(TraceKeyspace.makeStopSessionMutation(sessionId, elapsed, ttl));
state.stop();
sessions.remove(state.sessionId);
@@ -204,7 +198,7 @@ public class Tracing
final String command = state.traceType.toString();
final int ttl = state.ttl;
- state.executeMutation(TraceKeyspace.makeStartSessionMutation(sessionId, client, parameters, request, startedAt, command, ttl));
+ TraceState.executeMutation(TraceKeyspace.makeStartSessionMutation(sessionId, client, parameters, request, startedAt, command, ttl));
return state;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e4eba253/src/java/org/apache/cassandra/transport/Connection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Connection.java b/src/java/org/apache/cassandra/transport/Connection.java
index e2811e9..af26557 100644
--- a/src/java/org/apache/cassandra/transport/Connection.java
+++ b/src/java/org/apache/cassandra/transport/Connection.java
@@ -19,7 +19,6 @@ package org.apache.cassandra.transport;
import io.netty.channel.Channel;
import io.netty.util.AttributeKey;
-import org.apache.cassandra.transport.messages.EventMessage;
public class Connection
{
@@ -65,12 +64,6 @@ public class Connection
return channel;
}
- public void sendIfRegistered(Event event)
- {
- if (getTracker().isRegistered(event.type, channel))
- channel.writeAndFlush(new EventMessage(event));
- }
-
public interface Factory
{
Connection newConnection(Channel channel, int version);
@@ -79,7 +72,5 @@ public class Connection
public interface Tracker
{
void addConnection(Channel ch, Connection connection);
-
- boolean isRegistered(Event.Type type, Channel ch);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e4eba253/src/java/org/apache/cassandra/transport/Event.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Event.java b/src/java/org/apache/cassandra/transport/Event.java
index 16b5f64..9b6fdd4 100644
--- a/src/java/org/apache/cassandra/transport/Event.java
+++ b/src/java/org/apache/cassandra/transport/Event.java
@@ -62,8 +62,6 @@ public abstract class Event
return StatusChange.deserializeEvent(cb, version);
case SCHEMA_CHANGE:
return SchemaChange.deserializeEvent(cb, version);
- case TRACE_COMPLETE:
- return TraceComplete.deserializeEvent(cb, version);
}
throw new AssertionError();
}
@@ -417,56 +415,4 @@ public abstract class Event
&& Objects.equal(argTypes, scc.argTypes);
}
}
-
- /**
- * @since native protocol v4
- */
- public static class TraceComplete extends Event
- {
- public final UUID traceSessionId;
-
- public TraceComplete(UUID traceSessionId)
- {
- super(Type.TRACE_COMPLETE);
- this.traceSessionId = traceSessionId;
- }
-
- public static Event deserializeEvent(ByteBuf cb, int version)
- {
- UUID traceSessionId = CBUtil.readUUID(cb);
- return new TraceComplete(traceSessionId);
- }
-
- protected void serializeEvent(ByteBuf dest, int version)
- {
- CBUtil.writeUUID(traceSessionId, dest);
- }
-
- protected int eventSerializedSize(int version)
- {
- return CBUtil.sizeOfUUID(traceSessionId);
- }
-
- @Override
- public String toString()
- {
- return traceSessionId.toString();
- }
-
- @Override
- public int hashCode()
- {
- return Objects.hashCode(traceSessionId);
- }
-
- @Override
- public boolean equals(Object other)
- {
- if (!(other instanceof TraceComplete))
- return false;
-
- TraceComplete tf = (TraceComplete)other;
- return Objects.equal(traceSessionId, tf.traceSessionId);
- }
- }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e4eba253/src/java/org/apache/cassandra/transport/Server.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Server.java b/src/java/org/apache/cassandra/transport/Server.java
index 40a3371..333b956 100644
--- a/src/java/org/apache/cassandra/transport/Server.java
+++ b/src/java/org/apache/cassandra/transport/Server.java
@@ -233,11 +233,6 @@ public class Server implements CassandraDaemon.Server
groups.get(type).add(ch);
}
- public boolean isRegistered(Event.Type type, Channel ch)
- {
- return groups.get(type).contains(ch);
- }
-
public void send(Event event)
{
groups.get(event.type).writeAndFlush(new EventMessage(event));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e4eba253/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/BatchMessage.java b/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
index 8f144d1..4755ad3 100644
--- a/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
@@ -165,7 +165,7 @@ public class BatchMessage extends Message.Request
if (state.traceNextQuery())
{
- state.createTracingSession(connection);
+ state.createTracingSession();
// TODO we don't have [typed] access to CQL bind variables here. CASSANDRA-4560 is open to add support.
Tracing.instance.begin("Execute batch of CQL3 queries", state.getClientAddress(), Collections.<String, String>emptyMap());
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e4eba253/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java b/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
index 2b21376..3eddc7d 100644
--- a/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
@@ -122,7 +122,7 @@ public class ExecuteMessage extends Message.Request
if (state.traceNextQuery())
{
- state.createTracingSession(connection);
+ state.createTracingSession();
ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
if (options.getPageSize() > 0)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e4eba253/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java b/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java
index db9e304..f54d1d9 100644
--- a/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java
@@ -71,7 +71,7 @@ public class PrepareMessage extends Message.Request
if (state.traceNextQuery())
{
- state.createTracingSession(connection);
+ state.createTracingSession();
Tracing.instance.begin("Preparing CQL3 query", state.getClientAddress(), ImmutableMap.of("query", query));
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e4eba253/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/QueryMessage.java b/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
index fe86a89..4e21678 100644
--- a/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
@@ -106,7 +106,7 @@ public class QueryMessage extends Message.Request
if (state.traceNextQuery())
{
- state.createTracingSession(connection);
+ state.createTracingSession();
ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
builder.put("query", query);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e4eba253/test/unit/org/apache/cassandra/tracing/TraceCompleteTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/tracing/TraceCompleteTest.java b/test/unit/org/apache/cassandra/tracing/TraceCompleteTest.java
deleted file mode 100644
index 8ef7e52..0000000
--- a/test/unit/org/apache/cassandra/tracing/TraceCompleteTest.java
+++ /dev/null
@@ -1,204 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.tracing;
-
-import java.util.Collections;
-import java.util.concurrent.TimeUnit;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import org.apache.cassandra.cql3.CQLTester;
-import org.apache.cassandra.cql3.QueryOptions;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.transport.Event;
-import org.apache.cassandra.transport.Message;
-import org.apache.cassandra.transport.ProtocolException;
-import org.apache.cassandra.transport.Server;
-import org.apache.cassandra.transport.SimpleClient;
-import org.apache.cassandra.transport.messages.QueryMessage;
-import org.apache.cassandra.transport.messages.RegisterMessage;
-
-public class TraceCompleteTest extends CQLTester
-{
- @Test
- public void testTraceComplete() throws Throwable
- {
- sessionNet(3);
-
- SimpleClient clientA = new SimpleClient(nativeAddr.getHostAddress(), nativePort);
- clientA.connect(false);
- try
- {
- SimpleClient.SimpleEventHandler eventHandlerA = new SimpleClient.SimpleEventHandler();
- clientA.setEventHandler(eventHandlerA);
-
- SimpleClient clientB = new SimpleClient(nativeAddr.getHostAddress(), nativePort);
- clientB.connect(false);
- try
- {
- SimpleClient.SimpleEventHandler eventHandlerB = new SimpleClient.SimpleEventHandler();
- clientB.setEventHandler(eventHandlerB);
-
- Message.Response resp = clientA.execute(new RegisterMessage(Collections.singletonList(Event.Type.TRACE_COMPLETE)));
- Assert.assertSame(Message.Type.READY, resp.type);
-
- createTable("CREATE TABLE %s (pk int PRIMARY KEY, v text)");
-
- QueryMessage query = new QueryMessage("SELECT * FROM " + KEYSPACE + '.' + currentTable(), QueryOptions.DEFAULT);
- query.setTracingRequested();
- resp = clientA.execute(query);
-
- Event event = eventHandlerA.queue.poll(250, TimeUnit.MILLISECONDS);
- Assert.assertNotNull(event);
-
- // assert that only the connection that started the trace receives the trace-complete event
- Assert.assertNull(eventHandlerB.queue.poll(100, TimeUnit.MILLISECONDS));
-
- Assert.assertSame(Event.Type.TRACE_COMPLETE, event.type);
- Assert.assertEquals(resp.getTracingId(), ((Event.TraceComplete) event).traceSessionId);
- }
- finally
- {
- clientB.close();
- }
- }
- finally
- {
- clientA.close();
- }
- }
-
- @Test
- public void testTraceCompleteVersion3() throws Throwable
- {
- sessionNet(3);
-
- SimpleClient clientA = new SimpleClient(nativeAddr.getHostAddress(), nativePort, Server.VERSION_3);
- clientA.connect(false);
- try
- {
- SimpleClient.SimpleEventHandler eventHandlerA = new SimpleClient.SimpleEventHandler();
- clientA.setEventHandler(eventHandlerA);
-
- try
- {
- clientA.execute(new RegisterMessage(Collections.singletonList(Event.Type.TRACE_COMPLETE)));
- Assert.fail();
- }
- catch (RuntimeException e)
- {
- Assert.assertTrue(e.getCause() instanceof ProtocolException); // that's what we want
- }
-
- createTable("CREATE TABLE %s (pk int PRIMARY KEY, v text)");
-
- QueryMessage query = new QueryMessage("SELECT * FROM " + KEYSPACE + '.' + currentTable(), QueryOptions.DEFAULT);
- query.setTracingRequested();
- clientA.execute(query);
-
- Event event = eventHandlerA.queue.poll(250, TimeUnit.MILLISECONDS);
- Assert.assertNull(event);
- }
- finally
- {
- clientA.close();
- }
- }
-
- @Test
- public void testTraceCompleteNotRegistered() throws Throwable
- {
- sessionNet(3);
-
- SimpleClient clientA = new SimpleClient(nativeAddr.getHostAddress(), nativePort);
- clientA.connect(false);
- try
- {
- SimpleClient.SimpleEventHandler eventHandlerA = new SimpleClient.SimpleEventHandler();
- clientA.setEventHandler(eventHandlerA);
-
- createTable("CREATE TABLE %s (pk int PRIMARY KEY, v text)");
-
- // check that we do NOT receive a trace-complete event, since we didn't register for that
-
- // with setTracingRequested()
- QueryMessage query = new QueryMessage("SELECT * FROM " + KEYSPACE + '.' + currentTable(), QueryOptions.DEFAULT);
- query.setTracingRequested();
- clientA.execute(query);
- // and without setTracingRequested()
- query = new QueryMessage("SELECT * FROM " + KEYSPACE + '.' + currentTable(), QueryOptions.DEFAULT);
- clientA.execute(query);
-
- Event event = eventHandlerA.queue.poll(250, TimeUnit.MILLISECONDS);
- Assert.assertNull(event);
- }
- finally
- {
- clientA.close();
- }
- }
-
- @Test
- public void testTraceCompleteWithProbability() throws Throwable
- {
- sessionNet(3);
-
- double traceProbability = StorageService.instance.getTraceProbability();
- // check for trace-probability in QueryState.traceNextQuery() is x<y, not x<=y
- StorageService.instance.setTraceProbability(1.1d);
-
- SimpleClient clientA = new SimpleClient(nativeAddr.getHostAddress(), nativePort);
- clientA.connect(false);
- try
- {
- SimpleClient.SimpleEventHandler eventHandlerA = new SimpleClient.SimpleEventHandler();
- clientA.setEventHandler(eventHandlerA);
-
- SimpleClient clientB = new SimpleClient(nativeAddr.getHostAddress(), nativePort);
- clientB.connect(false);
- try
- {
- SimpleClient.SimpleEventHandler eventHandlerB = new SimpleClient.SimpleEventHandler();
- clientB.setEventHandler(eventHandlerB);
-
- Message.Response resp = clientA.execute(new RegisterMessage(Collections.singletonList(Event.Type.TRACE_COMPLETE)));
- Assert.assertSame(Message.Type.READY, resp.type);
-
- createTable("CREATE TABLE %s (pk int PRIMARY KEY, v text)");
-
- QueryMessage query = new QueryMessage("SELECT * FROM " + KEYSPACE + '.' + currentTable(), QueryOptions.DEFAULT);
- clientA.execute(query);
-
- Event event = eventHandlerA.queue.poll(2000, TimeUnit.MILLISECONDS);
- Assert.assertNull(event);
-
- Assert.assertNull(eventHandlerB.queue.poll(100, TimeUnit.MILLISECONDS));
- }
- finally
- {
- clientB.close();
- }
- }
- finally
- {
- StorageService.instance.setTraceProbability(traceProbability);
- clientA.close();
- }
- }
-}