You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2013/01/08 21:29:24 UTC
[7/8] git commit: Merge branch 'cassandra-1.1' into cassandra-1.2
Merge branch 'cassandra-1.1' into cassandra-1.2
Conflicts:
CHANGES.txt
src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
src/java/org/apache/cassandra/service/StorageService.java
src/java/org/apache/cassandra/service/StorageServiceMBean.java
src/java/org/apache/cassandra/tools/NodeCmd.java
src/java/org/apache/cassandra/tools/NodeProbe.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a96a8d41
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a96a8d41
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a96a8d41
Branch: refs/heads/cassandra-1.2
Commit: a96a8d41e3c3b902e9485c3c1067604dccf4b6fb
Parents: 9458530 0906b7c
Author: Yuki Morishita <yu...@apache.org>
Authored: Tue Jan 8 14:28:23 2013 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Tue Jan 8 14:28:23 2013 -0600
----------------------------------------------------------------------
CHANGES.txt | 4 +-
.../io/compress/CompressedRandomAccessReader.java | 74 +++------
.../cassandra/service/AntiEntropyService.java | 10 +
.../apache/cassandra/service/StorageService.java | 137 ++++++++++-----
.../cassandra/service/StorageServiceMBean.java | 16 ++-
src/java/org/apache/cassandra/tools/NodeCmd.java | 28 ++--
src/java/org/apache/cassandra/tools/NodeProbe.java | 73 ++++++++
7 files changed, 229 insertions(+), 113 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a96a8d41/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 16473ec,5e87435..37a47f3
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,182 -1,34 +1,184 @@@
-1.1.9
+1.2.1
+ * disallow bloom filter false positive chance of 0 (CASSANDRA-5013)
+ * add threadpool size adjustment methods to JMXEnabledThreadPoolExecutor and
+ CompactionManagerMBean (CASSANDRA-5044)
- * Improve handling a changing target throttle rate mid-compaction (CASSANDRA-5087)
+ * fix hinting for dropped local writes (CASSANDRA-4753)
+ * off-heap cache doesn't need mutable column container (CASSANDRA-5057)
+ * apply disk_failure_policy to bad disks on initial directory creation
+ (CASSANDRA-4847)
+ * Optimize name-based queries to use ArrayBackedSortedColumns (CASSANDRA-5043)
+ * Fall back to old manifest if most recent is unparseable (CASSANDRA-5041)
+ * pool [Compressed]RandomAccessReader objects on the partitioned read path
+ (CASSANDRA-4942)
+ * Add debug logging to list filenames processed by Directories.migrateFile
+ method (CASSANDRA-4939)
+ * Expose black-listed directories via JMX (CASSANDRA-4848)
+ * Log compaction merge counts (CASSANDRA-4894)
+ * Minimize byte array allocation by AbstractData{Input,Output} (CASSANDRA-5090)
+ * Add SSL support for the binary protocol (CASSANDRA-5031)
+ * Allow non-schema system ks modification for shuffle to work (CASSANDRA-5097)
+ * cqlsh: Add default limit to SELECT statements (CASSANDRA-4972)
+ * cqlsh: fix DESCRIBE for 1.1 cfs in CQL3 (CASSANDRA-5101)
+ * Correctly gossip with nodes >= 1.1.7 (CASSANDRA-5102)
+ * Ensure CL guarantees on digest mismatch (CASSANDRA-5113)
+ * Validate correctly selects on composite partition key (CASSANDRA-5122)
+ * Fix exception when adding collection (CASSANDRA-5117)
+ * Handle states for non-vnode clusters correctly (CASSANDRA-5127)
+Merged from 1.1:
+ * Simplify CompressedRandomAccessReader to work around JDK FD bug (CASSANDRA-5088)
+ * Improve handling a changing target throttle rate mid-compaction (CASSANDRA-5087)
- * fix multithreaded compaction deadlock (CASSANDRA-4492)
- * fix specifying and altering crc_check_chance (CASSANDRA-5053)
- * Don't expire columns sooner than they should in 2ndary indexes (CASSANDRA-5079)
* Pig: correctly decode row keys in widerow mode (CASSANDRA-5098)
+ * nodetool repair command now prints progress (CASSANDRA-4767)
-1.1.8
- * reset getRangeSlice filter after finishing a row for get_paged_slice
- (CASSANDRA-4919)
+1.2.0
+ * Disallow counters in collections (CASSANDRA-5082)
+ * cqlsh: add unit tests (CASSANDRA-3920)
+ * fix default bloom_filter_fp_chance for LeveledCompactionStrategy (CASSANDRA-5093)
+
+
+1.2.0-rc2
+ * fix nodetool ownership display with vnodes (CASSANDRA-5065)
+ * cqlsh: add DESCRIBE KEYSPACES command (CASSANDRA-5060)
+ * Fix potential infinite loop when reloading CFS (CASSANDRA-5064)
+ * Fix SimpleAuthorizer example (CASSANDRA-5072)
+ * cqlsh: force CL.ONE for tracing and system.schema* queries (CASSANDRA-5070)
+ * Includes cassandra-shuffle in the debian package (CASSANDRA-5058)
+Merged from 1.1:
+ * fix multithreaded compaction deadlock (CASSANDRA-4492)
* fix temporarily missing schema after upgrade from pre-1.1.5 (CASSANDRA-5061)
+ * Fix ALTER TABLE overriding compression options with defaults
+ (CASSANDRA-4996, 5066)
+ * fix specifying and altering crc_check_chance (CASSANDRA-5053)
+ * fix Murmur3Partitioner ownership% calculation (CASSANDRA-5076)
+ * Don't expire columns sooner than they should in 2ndary indexes (CASSANDRA-5079)
+
+
+1.2-rc1
+ * rename rpc_timeout settings to request_timeout (CASSANDRA-5027)
+ * add BF with 0.1 FP to LCS by default (CASSANDRA-5029)
+ * Fix preparing insert queries (CASSANDRA-5016)
+ * Fix preparing queries with counter increment (CASSANDRA-5022)
+ * Fix preparing updates with collections (CASSANDRA-5017)
+ * Don't generate UUID based on other node address (CASSANDRA-5002)
+ * Fix message when trying to alter a clustering key type (CASSANDRA-5012)
+ * Update IAuthenticator to match the new IAuthorizer (CASSANDRA-5003)
+ * Fix inserting only a key in CQL3 (CASSANDRA-5040)
+ * Fix CQL3 token() function when used with strings (CASSANDRA-5050)
+Merged from 1.1:
* reduce log spam from invalid counter shards (CASSANDRA-5026)
* Improve schema propagation performance (CASSANDRA-5025)
- * Fall back to old describe_splits if d_s_ex is not available (CASSANDRA-4803)
- * Improve error reporting when streaming ranges fail (CASSANDRA-5009)
+ * Fix for IndexHelper.IndexFor throws OOB Exception (CASSANDRA-5030)
+ * cqlsh: make it possible to describe thrift CFs (CASSANDRA-4827)
* cqlsh: fix timestamp formatting on some platforms (CASSANDRA-5046)
- * Fix ALTER TABLE overriding compression options with defaults (CASSANDRA-4996, 5066)
- * Avoid error opening data file on startup (CASSANDRA-4984)
- * Fix wrong index_options in cli 'show schema' (CASSANDRA-5008)
- * Allow overriding number of available processor (CASSANDRA-4790)
-1.1.7
- * cqlsh: improve COPY FROM performance (CASSANDRA-4921)
+1.2-beta3
+ * make consistency level configurable in cqlsh (CASSANDRA-4829)
+ * fix cqlsh rendering of blob fields (CASSANDRA-4970)
+ * fix cqlsh DESCRIBE command (CASSANDRA-4913)
+ * save truncation position in system table (CASSANDRA-4906)
+ * Move CompressionMetadata off-heap (CASSANDRA-4937)
+ * allow CLI to GET cql3 columnfamily data (CASSANDRA-4924)
+ * Fix rare race condition in getExpireTimeForEndpoint (CASSANDRA-4402)
+ * acquire references to overlapping sstables during compaction so bloom filter
+ doesn't get free'd prematurely (CASSANDRA-4934)
+ * Don't share slice query filter in CQL3 SelectStatement (CASSANDRA-4928)
+ * Separate tracing from Log4J (CASSANDRA-4861)
+ * Exclude gcable tombstones from merkle-tree computation (CASSANDRA-4905)
+ * Better printing of AbstractBounds for tracing (CASSANDRA-4931)
+ * Optimize mostRecentTombstone check in CC.collectAllData (CASSANDRA-4883)
+ * Change stream session ID to UUID to avoid collision from same node (CASSANDRA-4813)
+ * Use Stats.db when bulk loading if present (CASSANDRA-4957)
+ * Skip repair on system_trace and keyspaces with RF=1 (CASSANDRA-4956)
+ * (cql3) Remove arbitrary SELECT limit (CASSANDRA-4918)
+ * Correctly handle prepared operation on collections (CASSANDRA-4945)
+ * Fix CQL3 LIMIT (CASSANDRA-4877)
+ * Fix Stress for CQL3 (CASSANDRA-4979)
+ * Remove cassandra specific exceptions from JMX interface (CASSANDRA-4893)
+ * (CQL3) Force using ALLOW FILTERING on potentially inefficient queries (CASSANDRA-4915)
+ * (cql3) Fix adding column when the table has collections (CASSANDRA-4982)
+ * (cql3) Fix allowing collections with compact storage (CASSANDRA-4990)
+ * (cql3) Refuse ttl/writetime function on collections (CASSANDRA-4992)
+ * Replace IAuthority with new IAuthorizer (CASSANDRA-4874)
+ * clqsh: fix KEY pseudocolumn escaping when describing Thrift tables
+ in CQL3 mode (CASSANDRA-4955)
* add basic authentication support for Pig CassandraStorage (CASSANDRA-3042)
* fix CQL2 ALTER TABLE compaction_strategy_class altering (CASSANDRA-4965)
+Merged from 1.1:
+ * Fall back to old describe_splits if d_s_ex is not available (CASSANDRA-4803)
+ * Improve error reporting when streaming ranges fail (CASSANDRA-5009)
+ * Fix cqlsh timestamp formatting of timezone info (CASSANDRA-4746)
+ * Fix assertion failure with leveled compaction (CASSANDRA-4799)
+ * Check for null end_token in get_range_slice (CASSANDRA-4804)
+ * Remove all remnants of removed nodes (CASSANDRA-4840)
+ * Add aut-reloading of the log4j file in debian package (CASSANDRA-4855)
+ * Fix estimated row cache entry size (CASSANDRA-4860)
+ * reset getRangeSlice filter after finishing a row for get_paged_slice
+ (CASSANDRA-4919)
* expunge row cache post-truncate (CASSANDRA-4940)
- * remove IAuthority2 (CASSANDRA-4875)
+ * Allow static CF definition with compact storage (CASSANDRA-4910)
+ * Fix endless loop/compaction of schema_* CFs due to broken timestamps (CASSANDRA-4880)
+ * Fix 'wrong class type' assertion in CounterColumn (CASSANDRA-4976)
+
+
+1.2-beta2
+ * fp rate of 1.0 disables BF entirely; LCS defaults to 1.0 (CASSANDRA-4876)
+ * off-heap bloom filters for row keys (CASSANDRA_4865)
+ * add extension point for sstable components (CASSANDRA-4049)
+ * improve tracing output (CASSANDRA-4852, 4862)
+ * make TRACE verb droppable (CASSANDRA-4672)
+ * fix BulkLoader recognition of CQL3 columnfamilies (CASSANDRA-4755)
+ * Sort commitlog segments for replay by id instead of mtime (CASSANDRA-4793)
+ * Make hint delivery asynchronous (CASSANDRA-4761)
+ * Pluggable Thrift transport factories for CLI and cqlsh (CASSANDRA-4609, 4610)
+ * cassandra-cli: allow Double value type to be inserted to a column (CASSANDRA-4661)
+ * Add ability to use custom TServerFactory implementations (CASSANDRA-4608)
+ * optimize batchlog flushing to skip successful batches (CASSANDRA-4667)
+ * include metadata for system keyspace itself in schema tables (CASSANDRA-4416)
+ * add check to PropertyFileSnitch to verify presence of location for
+ local node (CASSANDRA-4728)
+ * add PBSPredictor consistency modeler (CASSANDRA-4261)
+ * remove vestiges of Thrift unframed mode (CASSANDRA-4729)
+ * optimize single-row PK lookups (CASSANDRA-4710)
+ * adjust blockFor calculation to account for pending ranges due to node
+ movement (CASSANDRA-833)
+ * Change CQL version to 3.0.0 and stop accepting 3.0.0-beta1 (CASSANDRA-4649)
+ * (CQL3) Make prepared statement global instead of per connection
+ (CASSANDRA-4449)
+ * Fix scrubbing of CQL3 created tables (CASSANDRA-4685)
+ * (CQL3) Fix validation when using counter and regular columns in the same
+ table (CASSANDRA-4706)
+ * Fix bug starting Cassandra with simple authentication (CASSANDRA-4648)
+ * Add support for batchlog in CQL3 (CASSANDRA-4545, 4738)
+ * Add support for multiple column family outputs in CFOF (CASSANDRA-4208)
+ * Support repairing only the local DC nodes (CASSANDRA-4747)
+ * Use rpc_address for binary protocol and change default port (CASSANRA-4751)
+ * Fix use of collections in prepared statements (CASSANDRA-4739)
+ * Store more information into peers table (CASSANDRA-4351, 4814)
+ * Configurable bucket size for size tiered compaction (CASSANDRA-4704)
+ * Run leveled compaction in parallel (CASSANDRA-4310)
+ * Fix potential NPE during CFS reload (CASSANDRA-4786)
+ * Composite indexes may miss results (CASSANDRA-4796)
+ * Move consistency level to the protocol level (CASSANDRA-4734, 4824)
+ * Fix Subcolumn slice ends not respected (CASSANDRA-4826)
+ * Fix Assertion error in cql3 select (CASSANDRA-4783)
+ * Fix list prepend logic (CQL3) (CASSANDRA-4835)
+ * Add booleans as literals in CQL3 (CASSANDRA-4776)
+ * Allow renaming PK columns in CQL3 (CASSANDRA-4822)
+ * Fix binary protocol NEW_NODE event (CASSANDRA-4679)
+ * Fix potential infinite loop in tombstone compaction (CASSANDRA-4781)
+ * Remove system tables accounting from schema (CASSANDRA-4850)
+ * (cql3) Force provided columns in clustering key order in
+ 'CLUSTERING ORDER BY' (CASSANDRA-4881)
+ * Fix composite index bug (CASSANDRA-4884)
+ * Fix short read protection for CQL3 (CASSANDRA-4882)
+ * Add tracing support to the binary protocol (CASSANDRA-4699)
+ * (cql3) Don't allow prepared marker inside collections (CASSANDRA-4890)
+ * Re-allow order by on non-selected columns (CASSANDRA-4645)
+ * Bug when composite index is created in a table having collections (CASSANDRA-4909)
+ * log index scan subject in CompositesSearcher (CASSANDRA-4904)
+Merged from 1.1:
* add get[Row|Key]CacheEntries to CacheServiceMBean (CASSANDRA-4859)
* fix get_paged_slice to wrap to next row correctly (CASSANDRA-4816)
* fix indexing empty column values (CASSANDRA-4832)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a96a8d41/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
index dc6b0df,a5faff1..bbd2466
--- a/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
@@@ -35,31 -26,25 +32,34 @@@ import org.apache.cassandra.io.util.Poo
import org.apache.cassandra.io.util.RandomAccessReader;
import org.apache.cassandra.utils.FBUtilities;
- // TODO refactor this to separate concept of "buffer to avoid lots of read() syscalls" and "compression buffer"
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
+ /**
+ * CRAR extends RAR to transparently uncompress blocks from the file into RAR.buffer. Most of the RAR
+ * "read bytes from the buffer, rebuffering when necessary" machinery works unchanged after that.
+ */
public class CompressedRandomAccessReader extends RandomAccessReader
{
- private static final Logger logger = LoggerFactory.getLogger(CompressedRandomAccessReader.class);
-
- public static RandomAccessReader open(String dataFilePath, CompressionMetadata metadata) throws IOException
+ public static CompressedRandomAccessReader open(String path, CompressionMetadata metadata, CompressedSegmentedFile owner)
{
- return open(dataFilePath, metadata, false);
+ try
+ {
+ return new CompressedRandomAccessReader(path, metadata, false, owner);
+ }
+ catch (FileNotFoundException e)
+ {
+ throw new RuntimeException(e);
+ }
}
- public static RandomAccessReader open(String dataFilePath, CompressionMetadata metadata, boolean skipIOCache) throws IOException
+ public static CompressedRandomAccessReader open(String dataFilePath, CompressionMetadata metadata, boolean skipIOCache)
{
- return new CompressedRandomAccessReader(dataFilePath, metadata, skipIOCache);
+ try
+ {
+ return new CompressedRandomAccessReader(dataFilePath, metadata, skipIOCache, null);
+ }
+ catch (FileNotFoundException e)
+ {
+ throw new RuntimeException(e);
+ }
}
private final CompressionMetadata metadata;
@@@ -70,30 -56,13 +71,13 @@@
private final Checksum checksum = new CRC32();
// raw checksum bytes
- private final byte[] checksumBytes = new byte[4];
-
- private final FileInputStream source;
- private final FileChannel channel;
+ private final ByteBuffer checksumBytes = ByteBuffer.wrap(new byte[4]);
- public CompressedRandomAccessReader(String dataFilePath, CompressionMetadata metadata, boolean skipIOCache) throws IOException
+ private CompressedRandomAccessReader(String dataFilePath, CompressionMetadata metadata, boolean skipIOCache, PoolingSegmentedFile owner) throws FileNotFoundException
{
- super(new File(dataFilePath), metadata.chunkLength(), skipIOCache);
+ super(new File(dataFilePath), metadata.chunkLength(), skipIOCache, owner);
this.metadata = metadata;
- compressed = new byte[metadata.compressor().initialCompressedBufferLength(metadata.chunkLength())];
- // can't use super.read(...) methods
- // that is why we are allocating special InputStream to read data from disk
- // from already open file descriptor
- try
- {
- source = new FileInputStream(getFD());
- }
- catch (IOException e)
- {
- // fd == null, Not Supposed To Happen
- throw new RuntimeException(e);
- }
-
- channel = source.getChannel(); // for position manipulation
+ compressed = ByteBuffer.wrap(new byte[metadata.compressor().initialCompressedBufferLength(metadata.chunkLength())]);
}
@Override
@@@ -118,20 -76,18 +102,26 @@@
if (channel.position() != chunk.offset)
channel.position(chunk.offset);
- if (compressed.length < chunk.length)
- compressed = new byte[chunk.length];
+ if (compressed.capacity() < chunk.length)
+ compressed = ByteBuffer.wrap(new byte[chunk.length]);
+ else
+ compressed.clear();
+ compressed.limit(chunk.length);
- if (source.read(compressed, 0, chunk.length) != chunk.length)
+ if (channel.read(compressed) != chunk.length)
- throw new IOException(String.format("(%s) failed to read %d bytes from offset %d.", getPath(), chunk.length, chunk.offset));
+ throw new CorruptBlockException(getPath(), chunk);
+
+ // technically flip() is unnecessary since all the remaining work uses the raw array, but if that changes
+ // in the future this will save a lot of hair-pulling
+ compressed.flip();
- validBufferBytes = metadata.compressor().uncompress(compressed.array(), 0, chunk.length, buffer, 0);
+ try
+ {
- validBufferBytes = metadata.compressor().uncompress(compressed, 0, chunk.length, buffer, 0);
++ validBufferBytes = metadata.compressor().uncompress(compressed.array(), 0, chunk.length, buffer, 0);
+ }
+ catch (IOException e)
+ {
+ throw new CorruptBlockException(getPath(), chunk);
+ }
if (metadata.parameters.getCrcCheckChance() > FBUtilities.threadLocalRandom().nextDouble())
{
@@@ -151,11 -107,13 +141,10 @@@
private int checksum(CompressionMetadata.Chunk chunk) throws IOException
{
assert channel.position() == chunk.offset + chunk.length;
-
- if (source.read(checksumBytes, 0, checksumBytes.length) != checksumBytes.length)
+ checksumBytes.clear();
+ if (channel.read(checksumBytes) != checksumBytes.capacity())
- throw new IOException(String.format("(%s) failed to read checksum of the chunk at %d of length %d.",
- getPath(),
- chunk.offset,
- chunk.length));
+ throw new CorruptBlockException(getPath(), chunk);
-
- return Ints.fromByteArray(checksumBytes);
+ return checksumBytes.getInt(0);
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a96a8d41/src/java/org/apache/cassandra/service/AntiEntropyService.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a96a8d41/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageService.java
index b9e4c21,ad05ce2..15339c4
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@@ -26,12 -26,20 +26,15 @@@ import java.net.InetAddress
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.*;
-import java.util.Map.Entry;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
+ import java.util.concurrent.atomic.AtomicLong;
import javax.management.MBeanServer;
+ import javax.management.Notification;
+ import javax.management.NotificationBroadcasterSupport;
import javax.management.ObjectName;
-import com.google.common.base.Supplier;
import com.google.common.collect.*;
-
-import org.apache.cassandra.metrics.ClientRequestMetrics;
-
import org.apache.log4j.Level;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
@@@ -79,12 -81,90 +82,15 @@@ import static com.google.common.base.Ch
* This class will also maintain histograms of the load information
* of other nodes in the cluster.
*/
- public class StorageService implements IEndpointStateChangeSubscriber, StorageServiceMBean
+ public class StorageService extends NotificationBroadcasterSupport implements IEndpointStateChangeSubscriber, StorageServiceMBean
{
- private static Logger logger_ = LoggerFactory.getLogger(StorageService.class);
+ private static final Logger logger = LoggerFactory.getLogger(StorageService.class);
public static final int RING_DELAY = getRingDelay(); // delay after which we assume ring has stablized
+ /* JMX notification serial number counter */
+ private final AtomicLong notificationSerialNumber = new AtomicLong();
+
- /* All verb handler identifiers */
- public enum Verb
- {
- MUTATION,
- BINARY, // Deprecated
- READ_REPAIR,
- READ,
- REQUEST_RESPONSE, // client-initiated reads and writes
- STREAM_INITIATE, // Deprecated
- STREAM_INITIATE_DONE, // Deprecated
- STREAM_REPLY,
- STREAM_REQUEST,
- RANGE_SLICE,
- BOOTSTRAP_TOKEN,
- TREE_REQUEST,
- TREE_RESPONSE,
- JOIN, // Deprecated
- GOSSIP_DIGEST_SYN,
- GOSSIP_DIGEST_ACK,
- GOSSIP_DIGEST_ACK2,
- DEFINITIONS_ANNOUNCE, // Deprecated
- DEFINITIONS_UPDATE,
- TRUNCATE,
- SCHEMA_CHECK,
- INDEX_SCAN, // Deprecated
- REPLICATION_FINISHED,
- INTERNAL_RESPONSE, // responses to internal calls
- COUNTER_MUTATION,
- STREAMING_REPAIR_REQUEST,
- STREAMING_REPAIR_RESPONSE,
- SNAPSHOT, // Similar to nt snapshot
- MIGRATION_REQUEST,
- GOSSIP_SHUTDOWN,
- // use as padding for backwards compatability where a previous version needs to validate a verb from the future.
- UNUSED_1,
- UNUSED_2,
- UNUSED_3,
- ;
- // remember to add new verbs at the end, since we serialize by ordinal
- }
- public static final Verb[] VERBS = Verb.values();
-
- public static final EnumMap<StorageService.Verb, Stage> verbStages = new EnumMap<StorageService.Verb, Stage>(StorageService.Verb.class)
- {{
- put(Verb.MUTATION, Stage.MUTATION);
- put(Verb.BINARY, Stage.MUTATION);
- put(Verb.READ_REPAIR, Stage.MUTATION);
- put(Verb.TRUNCATE, Stage.MUTATION);
- put(Verb.READ, Stage.READ);
- put(Verb.REQUEST_RESPONSE, Stage.REQUEST_RESPONSE);
- put(Verb.STREAM_REPLY, Stage.MISC); // TODO does this really belong on misc? I've just copied old behavior here
- put(Verb.STREAM_REQUEST, Stage.STREAM);
- put(Verb.RANGE_SLICE, Stage.READ);
- put(Verb.BOOTSTRAP_TOKEN, Stage.MISC);
- put(Verb.TREE_REQUEST, Stage.ANTI_ENTROPY);
- put(Verb.TREE_RESPONSE, Stage.ANTI_ENTROPY);
- put(Verb.STREAMING_REPAIR_REQUEST, Stage.ANTI_ENTROPY);
- put(Verb.STREAMING_REPAIR_RESPONSE, Stage.ANTI_ENTROPY);
- put(Verb.GOSSIP_DIGEST_ACK, Stage.GOSSIP);
- put(Verb.GOSSIP_DIGEST_ACK2, Stage.GOSSIP);
- put(Verb.GOSSIP_DIGEST_SYN, Stage.GOSSIP);
- put(Verb.GOSSIP_SHUTDOWN, Stage.GOSSIP);
- put(Verb.DEFINITIONS_UPDATE, Stage.MIGRATION);
- put(Verb.SCHEMA_CHECK, Stage.MIGRATION);
- put(Verb.MIGRATION_REQUEST, Stage.MIGRATION);
- put(Verb.INDEX_SCAN, Stage.READ);
- put(Verb.REPLICATION_FINISHED, Stage.MISC);
- put(Verb.INTERNAL_RESPONSE, Stage.INTERNAL_RESPONSE);
- put(Verb.COUNTER_MUTATION, Stage.MUTATION);
- put(Verb.SNAPSHOT, Stage.MISC);
- put(Verb.UNUSED_1, Stage.INTERNAL_RESPONSE);
- put(Verb.UNUSED_2, Stage.INTERNAL_RESPONSE);
- put(Verb.UNUSED_3, Stage.INTERNAL_RESPONSE);
- }};
-
private static int getRingDelay()
{
String newdelay = System.getProperty("cassandra.ring_delay_ms");
@@@ -180,10 -250,8 +186,12 @@@
private static final AtomicInteger nextRepairCommand = new AtomicInteger();
+ private static ScheduledRangeTransferExecutorService rangeXferExecutor = new ScheduledRangeTransferExecutorService();
+
+ private final List<IEndpointLifecycleSubscriber> lifecycleSubscribers = new CopyOnWriteArrayList<IEndpointLifecycleSubscriber>();
+
+ private final ObjectName jmxObjectName;
+
public void finishBootstrapping()
{
isBootstrapMode = false;
@@@ -2225,6 -1944,82 +2234,34 @@@
}
/**
+ * Sends JMX notification to subscribers.
+ *
+ * @param type Message type
+ * @param message Message itself
+ * @param userObject Arbitrary object to attach to notification
+ */
+ public void sendNotification(String type, String message, Object userObject)
+ {
+ Notification jmxNotification = new Notification(type, jmxObjectName, notificationSerialNumber.incrementAndGet(), message);
+ jmxNotification.setUserData(userObject);
+ sendNotification(jmxNotification);
+ }
+
- public int forceRepairAsync(final String tableName, final boolean isSequential, final boolean primaryRange, final String... columnFamilies)
++ public int forceRepairAsync(final String keyspace, final boolean isSequential, final boolean isLocal, final boolean primaryRange, final String... columnFamilies)
+ {
- if (Table.SYSTEM_TABLE.equals(tableName))
++ if (Table.SYSTEM_KS.equals(keyspace) || Tracing.TRACE_KS.equals(keyspace) || Auth.AUTH_KS.equals(keyspace))
+ return 0;
+
+ final int cmd = nextRepairCommand.incrementAndGet();
- final Collection<Range<Token>> ranges = primaryRange ? Collections.singletonList(getLocalPrimaryRange()) : getLocalRanges(tableName);
++ final Collection<Range<Token>> ranges = primaryRange ? Collections.singletonList(getLocalPrimaryRange()) : getLocalRanges(keyspace);
+ if (ranges.size() > 0)
+ {
- new Thread(new WrappedRunnable()
- {
- protected void runMayThrow() throws Exception
- {
- String message = String.format("Starting repair command #%d, repairing %d ranges for keyspace %s", cmd, ranges.size(), tableName);
- logger_.info(message);
- sendNotification("repair", message, new int[]{cmd, AntiEntropyService.Status.STARTED.ordinal()});
-
- List<AntiEntropyService.RepairFuture> futures = new ArrayList<AntiEntropyService.RepairFuture>(ranges.size());
- for (Range<Token> range : ranges)
- {
- AntiEntropyService.RepairFuture future = forceTableRepair(range, tableName, isSequential, columnFamilies);
- if (future == null)
- continue;
- futures.add(future);
- // wait for a session to be done with its differencing before starting the next one
- try
- {
- future.session.differencingDone.await();
- }
- catch (InterruptedException e)
- {
- message = "Interrupted while waiting for the differencing of repair session " + future.session + " to be done. Repair may be imprecise.";
- logger_.error(message, e);
- sendNotification("repair", message, new int[]{cmd, AntiEntropyService.Status.SESSION_FAILED.ordinal()});
- }
- }
- for (AntiEntropyService.RepairFuture future : futures)
- {
- try
- {
- future.get();
- message = String.format("Repair session %s for range %s finished", future.session.getName(), future.session.getRange().toString());
- sendNotification("repair", message, new int[]{cmd, AntiEntropyService.Status.SESSION_SUCCESS.ordinal()});
- }
- catch (ExecutionException e)
- {
- message = String.format("Repair session %s for range %s failed with error %s", future.session.getName(), future.session.getRange().toString(), e.getCause().getMessage());
- sendNotification("repair", message, new int[]{cmd, AntiEntropyService.Status.SESSION_FAILED.ordinal()});
- }
- catch (Exception e)
- {
- message = String.format("Repair session %s for range %s failed with error %s", future.session.getName(), future.session.getRange().toString(), e.getMessage());
- sendNotification("repair", message, new int[]{cmd, AntiEntropyService.Status.SESSION_FAILED.ordinal()});
- }
- }
- sendNotification("repair", String.format("Repair command #%d finished", cmd), new int[]{cmd, AntiEntropyService.Status.FINISHED.ordinal()});
- }
- }).start();
++ new Thread(createRepairTask(cmd, keyspace, ranges, isSequential, isLocal, columnFamilies)).start();
+ }
+ return cmd;
+ }
+
+ /**
* Trigger proactive repair for a table and column families.
* @param tableName
* @param columnFamilies
@@@ -2245,65 -2098,25 +2282,75 @@@
Token parsedBeginToken = getPartitioner().getTokenFactory().fromString(beginToken);
Token parsedEndToken = getPartitioner().getTokenFactory().fromString(endToken);
- logger_.info("starting user-requested repair of range ({}, {}] for keyspace {} and column families {}",
- new Object[] {parsedBeginToken, parsedEndToken, tableName, columnFamilies});
- AntiEntropyService.RepairFuture future = forceTableRepair(new Range<Token>(parsedBeginToken, parsedEndToken), tableName, isSequential, columnFamilies);
- if (future == null)
+ logger.info("starting user-requested repair of range ({}, {}] for keyspace {} and column families {}",
+ parsedBeginToken, parsedEndToken, tableName, columnFamilies);
+ forceTableRepairRange(tableName, Collections.singleton(new Range<Token>(parsedBeginToken, parsedEndToken)), isSequential, isLocal, columnFamilies);
+ }
+
+ public void forceTableRepairRange(final String tableName, final Collection<Range<Token>> ranges, boolean isSequential, boolean isLocal, final String... columnFamilies) throws IOException
+ {
- if (Table.SYSTEM_KS.equals(tableName) || Tracing.TRACE_KS.equals(tableName))
++ if (Table.SYSTEM_KS.equals(tableName) || Tracing.TRACE_KS.equals(tableName) || Auth.AUTH_KS.equals(tableName))
return;
- try
- {
- future.get();
- }
- catch (Exception e)
++ createRepairTask(nextRepairCommand.incrementAndGet(), tableName, ranges, isSequential, isLocal, columnFamilies).run();
++ }
+
- int cmd = nextRepairCommand.incrementAndGet();
- logger.info("Starting repair command #{}, repairing {} ranges.", cmd, ranges.size());
-
- List<AntiEntropyService.RepairFuture> futures = new ArrayList<AntiEntropyService.RepairFuture>(ranges.size());
- for (Range<Token> range : ranges)
++ private FutureTask<Object> createRepairTask(final int cmd, final String keyspace, final Collection<Range<Token>> ranges, final boolean isSequential, final boolean isLocal, final String... columnFamilies)
++ {
++ FutureTask<Object> task = new FutureTask<Object>(new WrappedRunnable()
{
- AntiEntropyService.RepairFuture future = forceTableRepair(range, tableName, isSequential, isLocal, columnFamilies);
- if (future == null)
- continue;
- futures.add(future);
- // wait for a session to be done with its differencing before starting the next one
- try
- {
- future.session.differencingDone.await();
- }
- catch (InterruptedException e)
- logger_.error("Repair session " + future.session.getName() + " failed.", e);
- }
++ protected void runMayThrow() throws Exception
+ {
- logger.error("Interrupted while waiting for the differencing of repair session " + future.session + " to be done. Repair may be imprecise.", e);
- }
- }
- if (futures.isEmpty())
- {
- logger.info("Nothing to repair on {} for command #{}", tableName, cmd);
- return;
- }
++ String message = String.format("Starting repair command #%d, repairing %d ranges for keyspace %s", cmd, ranges.size(), keyspace);
++ logger.info(message);
++ sendNotification("repair", message, new int[]{cmd, AntiEntropyService.Status.STARTED.ordinal()});
+
- boolean failedSession = false;
-
- // block until all repair sessions have completed
- for (AntiEntropyService.RepairFuture future : futures)
- {
- try
- {
- future.get();
- }
- catch (Exception e)
- {
- logger.error("Repair session " + future.session.getName() + " failed.", e);
- failedSession = true;
++ List<AntiEntropyService.RepairFuture> futures = new ArrayList<AntiEntropyService.RepairFuture>(ranges.size());
++ for (Range<Token> range : ranges)
++ {
++ AntiEntropyService.RepairFuture future = forceTableRepair(range, keyspace, isSequential, isLocal, columnFamilies);
++ if (future == null)
++ continue;
++ futures.add(future);
++ // wait for a session to be done with its differencing before starting the next one
++ try
++ {
++ future.session.differencingDone.await();
++ }
++ catch (InterruptedException e)
++ {
++ message = "Interrupted while waiting for the differencing of repair session " + future.session + " to be done. Repair may be imprecise.";
++ logger.error(message, e);
++ sendNotification("repair", message, new int[]{cmd, AntiEntropyService.Status.SESSION_FAILED.ordinal()});
++ }
++ }
++ for (AntiEntropyService.RepairFuture future : futures)
++ {
++ try
++ {
++ future.get();
++ message = String.format("Repair session %s for range %s finished", future.session.getName(), future.session.getRange().toString());
++ sendNotification("repair", message, new int[]{cmd, AntiEntropyService.Status.SESSION_SUCCESS.ordinal()});
++ }
++ catch (ExecutionException e)
++ {
++ message = String.format("Repair session %s for range %s failed with error %s", future.session.getName(), future.session.getRange().toString(), e.getCause().getMessage());
++ logger.error(message, e);
++ sendNotification("repair", message, new int[]{cmd, AntiEntropyService.Status.SESSION_FAILED.ordinal()});
++ }
++ catch (Exception e)
++ {
++ message = String.format("Repair session %s for range %s failed with error %s", future.session.getName(), future.session.getRange().toString(), e.getMessage());
++ logger.error(message, e);
++ sendNotification("repair", message, new int[]{cmd, AntiEntropyService.Status.SESSION_FAILED.ordinal()});
++ }
++ }
++ sendNotification("repair", String.format("Repair command #%d finished", cmd), new int[]{cmd, AntiEntropyService.Status.FINISHED.ordinal()});
+ }
- }
-
- if (failedSession)
- throw new IOException("Repair command #" + cmd + ": some repair session(s) failed (see log for details).");
- else
- logger.info("Repair command #{} completed successfully", cmd);
++ }, null);
++ return task;
}
- public AntiEntropyService.RepairFuture forceTableRepair(final Range<Token> range, final String tableName, boolean isSequential, final String... columnFamilies) throws IOException
+ public AntiEntropyService.RepairFuture forceTableRepair(final Range<Token> range, final String tableName, boolean isSequential, boolean isLocal, final String... columnFamilies) throws IOException
{
ArrayList<String> names = new ArrayList<String>();
for (ColumnFamilyStore cfStore : getValidColumnFamilies(tableName, columnFamilies))
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a96a8d41/src/java/org/apache/cassandra/service/StorageServiceMBean.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageServiceMBean.java
index 0176b1d,c34faf3..067d08a
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@@ -27,7 -26,14 +27,9 @@@ import java.util.Map
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
- public interface StorageServiceMBean
+ import javax.management.NotificationEmitter;
+
-import org.apache.cassandra.config.ConfigurationException;
-import org.apache.cassandra.thrift.InvalidRequestException;
-import org.apache.cassandra.thrift.UnavailableException;
-
-
+ public interface StorageServiceMBean extends NotificationEmitter
{
/**
* Retrieve the list of live nodes in the cluster, where "liveness" is
@@@ -252,6 -244,18 +254,18 @@@
public void forceTableFlush(String tableName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException;
/**
+ * Invoke repair asynchronously.
+ * You can track repair progress by subscribing JMX notification sent from this StorageServiceMBean.
+ * Notification format is:
+ * type: "repair"
+ * userObject: int array of length 2, [0]=command number, [1]=ordinal of AntiEntropyService.Status
+ *
+ * @return Repair command number, or 0 if nothing to repair
- * @see #forceTableRepair(String, boolean, String...)
++ * @see #forceTableRepair(String, boolean, boolean, String...)
+ */
- public int forceRepairAsync(String tableName, boolean isSequential, boolean primaryRange, String... columnFamilies);
++ public int forceRepairAsync(String keyspace, boolean isSequential, boolean isLocal, boolean primaryRange, String... columnFamilies);
+
+ /**
* Triggers proactive repair for given column families, or all columnfamilies for the given table
* if none are explicitly listed.
* @param tableName
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a96a8d41/src/java/org/apache/cassandra/tools/NodeCmd.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/tools/NodeCmd.java
index ecd9e3c,8d4f9a1..d72f3d3
--- a/src/java/org/apache/cassandra/tools/NodeCmd.java
+++ b/src/java/org/apache/cassandra/tools/NodeCmd.java
@@@ -25,35 -28,23 +25,33 @@@ import java.net.ConnectException
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.text.DecimalFormat;
- import java.util.concurrent.ExecutionException;
- import java.util.Map.Entry;
import java.util.*;
+ import java.util.Map.Entry;
+ import java.util.concurrent.ExecutionException;
+import com.google.common.collect.LinkedHashMultimap;
+import com.google.common.collect.Maps;
import org.apache.commons.cli.*;
-
- import org.apache.cassandra.service.CacheServiceMBean;
- import org.apache.cassandra.service.PBSPredictionResult;
- import org.apache.cassandra.service.PBSPredictorMBean;
- import org.apache.cassandra.service.StorageProxyMBean;
++import org.yaml.snakeyaml.Loader;
++import org.yaml.snakeyaml.TypeDescription;
++import org.yaml.snakeyaml.Yaml;
++import org.yaml.snakeyaml.constructor.Constructor;
import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutorMBean;
-import org.apache.cassandra.config.ConfigurationException;
import org.apache.cassandra.db.ColumnFamilyStoreMBean;
+import org.apache.cassandra.db.Table;
import org.apache.cassandra.db.compaction.CompactionManagerMBean;
import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.exceptions.ConfigurationException;
- import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.locator.EndpointSnitchInfoMBean;
import org.apache.cassandra.net.MessagingServiceMBean;
+ import org.apache.cassandra.service.CacheServiceMBean;
++import org.apache.cassandra.service.PBSPredictionResult;
++import org.apache.cassandra.service.PBSPredictorMBean;
+ import org.apache.cassandra.service.StorageProxyMBean;
-import org.apache.cassandra.thrift.InvalidRequestException;
import org.apache.cassandra.utils.EstimatedHistogram;
import org.apache.cassandra.utils.Pair;
- import org.yaml.snakeyaml.Loader;
- import org.yaml.snakeyaml.TypeDescription;
- import org.yaml.snakeyaml.Yaml;
- import org.yaml.snakeyaml.constructor.Constructor;
public class NodeCmd
{
@@@ -1334,11 -1040,8 +1332,9 @@@
{
case REPAIR :
boolean snapshot = cmd.hasOption(SNAPSHOT_REPAIR_OPT.left);
+ boolean localDC = cmd.hasOption(LOCAL_DC_REPAIR_OPT.left);
- if (cmd.hasOption(PRIMARY_RANGE_OPT.left))
- probe.forceTableRepairPrimaryRange(keyspace, snapshot, localDC, columnFamilies);
- else
- probe.forceTableRepair(keyspace, snapshot, localDC, columnFamilies);
+ boolean primaryRange = cmd.hasOption(PRIMARY_RANGE_OPT.left);
- probe.forceRepairAsync(System.out, keyspace, snapshot, primaryRange, columnFamilies);
++ probe.forceRepairAsync(System.out, keyspace, snapshot, localDC, primaryRange, columnFamilies);
break;
case FLUSH :
try { probe.forceTableFlush(keyspace, columnFamilies); }
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a96a8d41/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/tools/NodeProbe.java
index 261374f,264ea90..021a40f
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@@ -48,6 -52,9 +50,7 @@@ import org.apache.cassandra.net.Messagi
import org.apache.cassandra.service.*;
import org.apache.cassandra.streaming.StreamingService;
import org.apache.cassandra.streaming.StreamingServiceMBean;
-import org.apache.cassandra.thrift.InvalidRequestException;
-import org.apache.cassandra.thrift.UnavailableException;
+ import org.apache.cassandra.utils.SimpleCondition;
/**
* JMX client operations for Cassandra.
@@@ -198,19 -202,41 +201,41 @@@ public class NodeProb
ssProxy.forceTableFlush(tableName, columnFamilies);
}
- public void forceTableRepair(String tableName, boolean isSequential, String... columnFamilies) throws IOException
+ public void forceTableRepair(String tableName, boolean isSequential, boolean isLocal, String... columnFamilies) throws IOException
{
- ssProxy.forceTableRepair(tableName, isSequential, columnFamilies);
+ ssProxy.forceTableRepair(tableName, isSequential, isLocal, columnFamilies);
}
- public void forceRepairAsync(final PrintStream out, final String tableName, boolean isSequential, boolean primaryRange, String... columnFamilies) throws IOException
++ public void forceRepairAsync(final PrintStream out, final String tableName, boolean isSequential, boolean isLocal, boolean primaryRange, String... columnFamilies) throws IOException
+ {
+ RepairRunner runner = new RepairRunner(out, tableName, columnFamilies);
+ try
+ {
+ ssProxy.addNotificationListener(runner, null, null);
- runner.repairAndWait(ssProxy, isSequential, primaryRange);
++ runner.repairAndWait(ssProxy, isSequential, isLocal, primaryRange);
+ }
+ catch (Exception e)
+ {
+ throw new IOException(e) ;
+ }
+ finally
+ {
+ try
+ {
+ ssProxy.removeNotificationListener(runner);
+ }
+ catch (ListenerNotFoundException ignored) {}
+ }
+ }
+
- public void forceTableRepairPrimaryRange(String tableName, boolean isSequential, String... columnFamilies) throws IOException
+ public void forceTableRepairPrimaryRange(String tableName, boolean isSequential, boolean isLocal, String... columnFamilies) throws IOException
{
- ssProxy.forceTableRepairPrimaryRange(tableName, isSequential, columnFamilies);
+ ssProxy.forceTableRepairPrimaryRange(tableName, isSequential, isLocal, columnFamilies);
}
- public void forceTableRepairRange(String beginToken, String endToken, String tableName, boolean isSequential, String... columnFamilies) throws IOException
+ public void forceTableRepairRange(String beginToken, String endToken, String tableName, boolean isSequential, boolean isLocal, String... columnFamilies) throws IOException
{
- ssProxy.forceTableRepairRange(beginToken, endToken, tableName, isSequential, columnFamilies);
+ ssProxy.forceTableRepairRange(beginToken, endToken, tableName, isSequential, isLocal, columnFamilies);
}
public void invalidateKeyCache() throws IOException
@@@ -797,3 -795,52 +822,51 @@@ class ThreadPoolProxyMBeanIterator impl
throw new UnsupportedOperationException();
}
}
+
+ class RepairRunner implements NotificationListener
+ {
+ private final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
+ private final Condition condition = new SimpleCondition();
+ private final PrintStream out;
+ private final String keyspace;
+ private final String[] columnFamilies;
+ private int cmd;
+
+ RepairRunner(PrintStream out, String keyspace, String... columnFamilies)
+ {
+ this.out = out;
+ this.keyspace = keyspace;
+ this.columnFamilies = columnFamilies;
+ }
+
- public void repairAndWait(StorageServiceMBean ssProxy, boolean isSequential, boolean primaryRangeOnly) throws InterruptedException
++ public void repairAndWait(StorageServiceMBean ssProxy, boolean isSequential, boolean isLocal, boolean primaryRangeOnly) throws InterruptedException
+ {
- cmd = ssProxy.forceRepairAsync(keyspace, isSequential, primaryRangeOnly, columnFamilies);
++ cmd = ssProxy.forceRepairAsync(keyspace, isSequential, isLocal, primaryRangeOnly, columnFamilies);
+ if (cmd > 0)
+ {
+ condition.await();
+ }
+ else
+ {
+ String message = String.format("[%s] Nothing to repair for keyspace '%s'", format.format(System.currentTimeMillis()), keyspace);
+ out.println(message);
+ }
+ }
+
+ public void handleNotification(Notification notification, Object handback)
+ {
+ if ("repair".equals(notification.getType()))
+ {
- // repair status is int array with [0] = cmd number, [1] = status
+ int[] status = (int[]) notification.getUserData();
+ assert status.length == 2;
- // we only output what we invoked
+ if (cmd == status[0])
+ {
+ String message = String.format("[%s] %s", format.format(notification.getTimeStamp()), notification.getMessage());
+ out.println(message);
++ // repair status is int array with [0] = cmd number, [1] = status
+ if (status[1] == AntiEntropyService.Status.FINISHED.ordinal())
+ condition.signalAll();
+ }
+ }
+ }
+ }