You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2016/07/06 18:32:39 UTC
[01/16] cassandra git commit: Range tombstones that are masked by row
tombstones should not be written out
Repository: cassandra
Updated Branches:
refs/heads/cassandra-2.2 43c741e25 -> 00e7ecf13
refs/heads/cassandra-3.0 9ed3b42d3 -> 778f2a46e
refs/heads/cassandra-3.9 5ad17634a -> 59ee46e55
refs/heads/trunk b4133f38d -> 9fd607778
Range tombstones that are masked by row tombstones should not be written out
patch by Nachiket Patil; reviewed by Sylvain Lebresne for CASSANDRA-12030
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/98f5f77b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/98f5f77b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/98f5f77b
Branch: refs/heads/cassandra-3.9
Commit: 98f5f77bb3c5d50e52cbb6f577a463ca8a5134ad
Parents: 3c1653f
Author: Nachiket Patil <na...@apple.com>
Authored: Wed Jul 6 11:22:56 2016 +0200
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Wed Jul 6 14:35:10 2016 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../db/compaction/LazilyCompactedRow.java | 3 +-
.../apache/cassandra/db/RangeTombstoneTest.java | 40 ++++++++++++++++++++
3 files changed, 43 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/98f5f77b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b1dcbe1..7fa995d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.1.16
+ * Don't write shadowed range tombstone (CASSANDRA-12030)
* Fix filtering on clustering columns when 2i is used (CASSANDRA-11907)
* Reduce contention getting instances of CompositeType (CASSANDRA-10433)
* Improve digest calculation in the presence of overlapping tombstones (CASSANDRA-11349)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/98f5f77b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
index f912da2..dab5eeb 100644
--- a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
@@ -286,7 +286,8 @@ public class LazilyCompactedRow extends AbstractCompactedRow
RangeTombstone t = tombstone;
tombstone = null;
- if (t.data.isGcAble(controller.gcBefore) && t.timestamp() < getMaxPurgeableTimestamp())
+ if (t.data.isGcAble(controller.gcBefore) && t.timestamp() < getMaxPurgeableTimestamp() ||
+ maxRowTombstone.markedForDeleteAt >= t.timestamp())
{
indexBuilder.tombstoneTracker().update(t, true);
return null;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/98f5f77b/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java b/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java
index 3292422..dfd6960 100644
--- a/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java
+++ b/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java
@@ -39,6 +39,7 @@ import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.IndexType;
import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.compaction.LeveledCompactionStrategy;
import org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy;
import org.apache.cassandra.db.composites.CellName;
import org.apache.cassandra.db.composites.CellNames;
@@ -543,6 +544,45 @@ public class RangeTombstoneTest extends SchemaLoader
}
@Test
+ public void testCompactionOfRangeTombstonesCoveredByRowTombstone() throws Exception
+ {
+ long testTimeStamp = 1451606400L; // 01/01/2016 : 00:00:00 GMT
+ Keyspace table = Keyspace.open(KSNAME);
+ ColumnFamilyStore cfs = table.getColumnFamilyStore(CFNAME);
+ ByteBuffer key = ByteBufferUtil.bytes("k4");
+
+ // remove any existing sstables before starting
+ cfs.truncateBlocking();
+ cfs.disableAutoCompaction();
+ cfs.setCompactionStrategyClass(LeveledCompactionStrategy.class.getCanonicalName());
+
+ Mutation rm = new Mutation(KSNAME, key);
+ for (int i = 1; i < 11; i += 2, testTimeStamp += i * 10)
+ add(rm, i, testTimeStamp);
+ rm.apply();
+ cfs.forceBlockingFlush();
+
+ rm = new Mutation(KSNAME, key);
+ ColumnFamily cf = rm.addOrGet(CFNAME);
+
+ // Write the covering row tombstone
+ cf.delete(new DeletionTime(++testTimeStamp, (int) testTimeStamp));
+
+ // Create range tombstones covered by row tombstone above.
+ for (int i = 1; i < 11; i += 2, testTimeStamp -= i * 5)
+ delete(cf, 0, 7, testTimeStamp);
+ rm.apply();
+ cfs.forceBlockingFlush();
+
+ // there should be 2 sstables
+ assertEquals(2, cfs.getSSTables().size());
+
+ // compact down to nothing
+ CompactionManager.instance.performMaximal(cfs);
+ assertEquals(0, cfs.getSSTables().size());
+ }
+
+ @Test
public void testOverwritesToDeletedColumns() throws Exception
{
Keyspace table = Keyspace.open(KSNAME);
[09/16] cassandra git commit: Improve streaming synchronization and
fault tolerance
Posted by yu...@apache.org.
Improve streaming synchronization and fault tolerance
Patch by Paulo Motta; Reviewed by yukim for CASSANDRA-11414
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/00e7ecf1
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/00e7ecf1
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/00e7ecf1
Branch: refs/heads/cassandra-3.9
Commit: 00e7ecf1394f8704e2f13369f7950e129459ce2c
Parents: 43c741e
Author: Paulo Motta <pa...@gmail.com>
Authored: Wed Jul 6 12:16:16 2016 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed Jul 6 12:32:39 2016 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/streaming/ConnectionHandler.java | 8 +++-----
.../org/apache/cassandra/streaming/StreamReceiveTask.java | 2 --
3 files changed, 4 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/00e7ecf1/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index bfd8aa2..7d62f97 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.2.8
+ * Improve streaming synchronization and fault tolerance (CASSANDRA-11414)
* MemoryUtil.getShort() should return an unsigned short also for architectures not supporting unaligned memory accesses (CASSANDRA-11973)
Merged from 2.1:
* Don't write shadowed range tombstone (CASSANDRA-12030)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/00e7ecf1/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
index c497a39..364435e 100644
--- a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
+++ b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
@@ -233,6 +233,9 @@ public class ConnectionHandler
protected void signalCloseDone()
{
+ if (closeFuture == null)
+ close();
+
closeFuture.get().set(null);
// We can now close the socket
@@ -294,11 +297,6 @@ public class ConnectionHandler
}
}
}
- catch (SocketException e)
- {
- // socket is closed
- close();
- }
catch (Throwable t)
{
JVMStabilityInspector.inspectThrowable(t);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/00e7ecf1/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index 6911ec6..b342edc 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@ -18,8 +18,6 @@
package org.apache.cassandra.streaming;
import java.io.File;
-import java.io.IOError;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
[11/16] cassandra git commit: Merge branch 'cassandra-2.2' into
cassandra-3.0
Posted by yu...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/778f2a46
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/778f2a46
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/778f2a46
Branch: refs/heads/trunk
Commit: 778f2a46e2df52aa8451aceaf17046e6b8c86ace
Parents: 9ed3b42 00e7ecf
Author: Yuki Morishita <yu...@apache.org>
Authored: Wed Jul 6 12:33:54 2016 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed Jul 6 12:33:54 2016 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/streaming/ConnectionHandler.java | 8 ++--
.../cassandra/streaming/StreamReceiveTask.java | 50 +++++++++++++++-----
.../cassandra/streaming/StreamSession.java | 17 +++++--
.../streaming/StreamingTransferTest.java | 30 ++++++++++--
5 files changed, 83 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/778f2a46/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 02786c5,7d62f97..8118de1
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,26 -1,14 +1,27 @@@
-2.2.8
+3.0.9
+ * Improve streaming synchronization and fault tolerance (CASSANDRA-11414)
+ * Avoid potential race when rebuilding CFMetaData (CASSANDRA-12098)
+ * Avoid missing sstables when getting the canonical sstables (CASSANDRA-11996)
+ * Always select the live sstables when getting sstables in bounds (CASSANDRA-11944)
+ * Fix column ordering of results with static columns for Thrift requests in
+ a mixed 2.x/3.x cluster, also fix potential non-resolved duplication of
+ those static columns in query results (CASSANDRA-12123)
+ * Avoid digest mismatch with empty but static rows (CASSANDRA-12090)
+ * Fix EOF exception when altering column type (CASSANDRA-11820)
+Merged from 2.2:
* MemoryUtil.getShort() should return an unsigned short also for architectures not supporting unaligned memory accesses (CASSANDRA-11973)
Merged from 2.1:
- * Don't write shadowed range tombstone (CASSANDRA-12030)
- * Improve digest calculation in the presence of overlapping tombstones (CASSANDRA-11349)
* Fix filtering on clustering columns when 2i is used (CASSANDRA-11907)
- * Account for partition deletions in tombstone histogram (CASSANDRA-12112)
-2.2.7
+3.0.8
+ * Fix potential race in schema during new table creation (CASSANDRA-12083)
+ * cqlsh: fix error handling in rare COPY FROM failure scenario (CASSANDRA-12070)
+ * Disable autocompaction during drain (CASSANDRA-11878)
+ * Add a metrics timer to MemtablePool and use it to track time spent blocked on memory in MemtableAllocator (CASSANDRA-11327)
+ * Fix upgrading schema with super columns with non-text subcomparators (CASSANDRA-12023)
+ * Add TimeWindowCompactionStrategy (CASSANDRA-9666)
+Merged from 2.2:
* Allow nodetool info to run with readonly JMX access (CASSANDRA-11755)
* Validate bloom_filter_fp_chance against lowest supported
value when the table is created (CASSANDRA-11920)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/778f2a46/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index 6280f3a,b342edc..040906b
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@@ -17,9 -17,7 +17,6 @@@
*/
package org.apache.cassandra.streaming;
--import java.io.File;
- import java.io.IOError;
- import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
@@@ -36,19 -33,13 +33,20 @@@ import org.apache.cassandra.concurrent.
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.db.view.View;
import org.apache.cassandra.dht.Bounds;
import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.ISSTableScanner;
+import org.apache.cassandra.io.sstable.SSTableMultiWriter;
import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.io.sstable.format.SSTableWriter;
import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.Pair;
-
++import org.apache.cassandra.utils.Throwables;
import org.apache.cassandra.utils.concurrent.Refs;
/**
@@@ -65,16 -55,11 +63,16 @@@ public class StreamReceiveTask extends
// total size of files to receive
private final long totalSize;
+ // Transaction tracking new files received
- public final LifecycleTransaction txn;
++ private final LifecycleTransaction txn;
+
// true if task is done (either completed or aborted)
-- private boolean done = false;
++ private volatile boolean done = false;
// holds references to SSTables received
- protected Collection<SSTableWriter> sstables;
+ protected Collection<SSTableReader> sstables;
+
+ private int remoteSSTablesReceived = 0;
public StreamReceiveTask(StreamSession session, UUID cfId, int totalFiles, long totalSize)
{
@@@ -92,18 -74,16 +90,32 @@@
*
* @param sstable SSTable file received.
*/
- public synchronized void received(SSTableWriter sstable)
+ public synchronized void received(SSTableMultiWriter sstable)
{
if (done)
++ {
++ logger.warn("[{}] Received sstable {} on already finished stream received task. Aborting sstable.", session.planId(),
++ sstable.getFilename());
++ Throwables.maybeFail(sstable.abort(null));
return;
++ }
+
- assert cfId.equals(sstable.metadata.cfId);
+ remoteSSTablesReceived++;
+ assert cfId.equals(sstable.getCfId());
- Collection<SSTableReader> finished = sstable.finish(true);
- sstables.add(sstable);
++ Collection<SSTableReader> finished = null;
++ try
++ {
++ finished = sstable.finish(true);
++ }
++ catch (Throwable t)
++ {
++ Throwables.maybeFail(sstable.abort(t));
++ }
+ txn.update(finished, false);
+ sstables.addAll(finished);
- if (sstables.size() == totalFiles)
+ if (remoteSSTablesReceived == totalFiles)
{
done = true;
executor.submit(new OnCompletionRunnable(this));
@@@ -120,6 -100,6 +132,13 @@@
return totalSize;
}
++ public synchronized LifecycleTransaction getTransaction()
++ {
++ if (done)
++ throw new RuntimeException(String.format("Stream receive task {} of cf {} already finished.", session.planId(), cfId));
++ return txn;
++ }
++
private static class OnCompletionRunnable implements Runnable
{
private final StreamReceiveTask task;
@@@ -139,71 -117,52 +158,71 @@@
if (kscf == null)
{
// schema was dropped during streaming
- for (SSTableWriter writer : task.sstables)
- writer.abort();
task.sstables.clear();
- task.txn.abort();
++ task.abortTransaction();
+ task.session.taskCompleted(task);
return;
}
- ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
+ cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
+ hasViews = !Iterables.isEmpty(View.findAll(kscf.left, kscf.right));
- File lockfiledir = cfs.directories.getWriteableLocationAsFile(task.sstables.size() * 256L);
- StreamLockfile lockfile = new StreamLockfile(lockfiledir, UUID.randomUUID());
- lockfile.create(task.sstables);
- List<SSTableReader> readers = new ArrayList<>();
- for (SSTableWriter writer : task.sstables)
- readers.add(writer.finish(true));
- lockfile.delete();
- task.sstables.clear();
+ Collection<SSTableReader> readers = task.sstables;
try (Refs<SSTableReader> refs = Refs.ref(readers))
{
- // add sstables and build secondary indexes
- cfs.addSSTables(readers);
- cfs.indexManager.maybeBuildSecondaryIndexes(readers, cfs.indexManager.allIndexesNames());
-
- //invalidate row and counter cache
- if (cfs.isRowCacheEnabled() || cfs.metadata.isCounter())
+ //We have a special path for views.
+ //Since the view requires cleaning up any pre-existing state, we must put
+ //all partitions through the same write path as normal mutations.
+ //This also ensures any 2is are also updated
+ if (hasViews)
{
- List<Bounds<Token>> boundsToInvalidate = new ArrayList<>(readers.size());
- for (SSTableReader sstable : readers)
- boundsToInvalidate.add(new Bounds<Token>(sstable.first.getToken(), sstable.last.getToken()));
- Set<Bounds<Token>> nonOverlappingBounds = Bounds.getNonOverlappingBounds(boundsToInvalidate);
-
- if (cfs.isRowCacheEnabled())
+ for (SSTableReader reader : readers)
{
- int invalidatedKeys = cfs.invalidateRowCache(nonOverlappingBounds);
- if (invalidatedKeys > 0)
- logger.debug("[Stream #{}] Invalidated {} row cache entries on table {}.{} after stream " +
- "receive task completed.", task.session.planId(), invalidatedKeys,
- cfs.keyspace.getName(), cfs.getColumnFamilyName());
+ try (ISSTableScanner scanner = reader.getScanner())
+ {
+ while (scanner.hasNext())
+ {
+ try (UnfilteredRowIterator rowIterator = scanner.next())
+ {
+ //Apply unsafe (we will flush below before transaction is done)
+ new Mutation(PartitionUpdate.fromIterator(rowIterator)).applyUnsafe();
+ }
+ }
+ }
}
+ }
+ else
+ {
- task.txn.finish();
++ task.finishTransaction();
- if (cfs.metadata.isCounter())
+ // add sstables and build secondary indexes
+ cfs.addSSTables(readers);
+ cfs.indexManager.buildAllIndexesBlocking(readers);
+
+ //invalidate row and counter cache
+ if (cfs.isRowCacheEnabled() || cfs.metadata.isCounter())
{
- int invalidatedKeys = cfs.invalidateCounterCache(nonOverlappingBounds);
- if (invalidatedKeys > 0)
- logger.debug("[Stream #{}] Invalidated {} counter cache entries on table {}.{} after stream " +
- "receive task completed.", task.session.planId(), invalidatedKeys,
- cfs.keyspace.getName(), cfs.getColumnFamilyName());
+ List<Bounds<Token>> boundsToInvalidate = new ArrayList<>(readers.size());
+ readers.forEach(sstable -> boundsToInvalidate.add(new Bounds<Token>(sstable.first.getToken(), sstable.last.getToken())));
+ Set<Bounds<Token>> nonOverlappingBounds = Bounds.getNonOverlappingBounds(boundsToInvalidate);
+
+ if (cfs.isRowCacheEnabled())
+ {
+ int invalidatedKeys = cfs.invalidateRowCache(nonOverlappingBounds);
+ if (invalidatedKeys > 0)
+ logger.debug("[Stream #{}] Invalidated {} row cache entries on table {}.{} after stream " +
+ "receive task completed.", task.session.planId(), invalidatedKeys,
+ cfs.keyspace.getName(), cfs.getTableName());
+ }
+
+ if (cfs.metadata.isCounter())
+ {
+ int invalidatedKeys = cfs.invalidateCounterCache(nonOverlappingBounds);
+ if (invalidatedKeys > 0)
+ logger.debug("[Stream #{}] Invalidated {} counter cache entries on table {}.{} after stream " +
+ "receive task completed.", task.session.planId(), invalidatedKeys,
+ cfs.keyspace.getName(), cfs.getTableName());
+ }
}
}
}
@@@ -211,21 -171,10 +230,20 @@@
}
catch (Throwable t)
{
-- logger.error("Error applying streamed data: ", t);
JVMStabilityInspector.inspectThrowable(t);
task.session.onError(t);
}
+ finally
+ {
+ //We don't keep the streamed sstables since we've applied them manually
+ //So we abort the txn and delete the streamed sstables
+ if (hasViews)
+ {
+ if (cfs != null)
+ cfs.forceBlockingFlush();
- task.txn.abort();
++ task.abortTransaction();
+ }
+ }
}
}
@@@ -241,7 -190,8 +259,17 @@@
return;
done = true;
- txn.abort();
- for (SSTableWriter writer : sstables)
- writer.abort();
++ abortTransaction();
sstables.clear();
}
++
++ private synchronized void abortTransaction()
++ {
++ txn.abort();
++ }
++
++ private synchronized void finishTransaction()
++ {
++ txn.finish();
++ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/778f2a46/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/StreamSession.java
index bfbedc7,f4c900e..12f561b
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@@ -211,12 -212,6 +211,12 @@@ public class StreamSession implements I
}
+ public LifecycleTransaction getTransaction(UUID cfId)
+ {
+ assert receivers.containsKey(cfId);
- return receivers.get(cfId).txn;
++ return receivers.get(cfId).getTransaction();
+ }
+
/**
* Bind this session to report to specific {@link StreamResultFuture} and
* perform pre-streaming initialization.
@@@ -281,8 -276,8 +281,9 @@@
* @param flushTables flush tables?
* @param repairedAt the time the repair started.
*/
-- public void addTransferRanges(String keyspace, Collection<Range<Token>> ranges, Collection<String> columnFamilies, boolean flushTables, long repairedAt)
++ public synchronized void addTransferRanges(String keyspace, Collection<Range<Token>> ranges, Collection<String> columnFamilies, boolean flushTables, long repairedAt)
{
++ failIfFinished();
Collection<ColumnFamilyStore> stores = getColumnFamilyStores(keyspace, columnFamilies);
if (flushTables)
flushSSTables(stores);
@@@ -300,6 -295,6 +301,12 @@@
}
}
++ private void failIfFinished()
++ {
++ if (state() == State.COMPLETE || state() == State.FAILED)
++ throw new RuntimeException(String.format("Stream %s is finished with state %s", planId(), state().name()));
++ }
++
private Collection<ColumnFamilyStore> getColumnFamilyStores(String keyspace, Collection<String> columnFamilies)
{
Collection<ColumnFamilyStore> stores = new HashSet<>();
@@@ -371,8 -369,8 +378,9 @@@
}
}
-- public void addTransferFiles(Collection<SSTableStreamingSections> sstableDetails)
++ public synchronized void addTransferFiles(Collection<SSTableStreamingSections> sstableDetails)
{
++ failIfFinished();
Iterator<SSTableStreamingSections> iter = sstableDetails.iterator();
while (iter.hasNext())
{
@@@ -745,8 -743,8 +753,9 @@@
FBUtilities.waitOnFutures(flushes);
}
-- private void prepareReceiving(StreamSummary summary)
++ private synchronized void prepareReceiving(StreamSummary summary)
{
++ failIfFinished();
if (summary.files > 0)
receivers.put(summary.cfId, new StreamReceiveTask(this, summary.cfId, summary.files, summary.totalSize));
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/778f2a46/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
index 7223e76,2b16267..6be880c
--- a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
@@@ -229,14 -238,14 +229,38 @@@ public class StreamingTransferTes
List<Range<Token>> ranges = new ArrayList<>();
// wrapped range
ranges.add(new Range<Token>(p.getToken(ByteBufferUtil.bytes("key1")), p.getToken(ByteBufferUtil.bytes("key0"))));
-- new StreamPlan("StreamingTransferTest").transferRanges(LOCAL, cfs.keyspace.getName(), ranges, cfs.getColumnFamilyName()).execute().get();
++ StreamPlan streamPlan = new StreamPlan("StreamingTransferTest").transferRanges(LOCAL, cfs.keyspace.getName(), ranges, cfs.getColumnFamilyName());
++ streamPlan.execute().get();
verifyConnectionsAreClosed();
++
++ //cannot add ranges after stream session is finished
++ try
++ {
++ streamPlan.transferRanges(LOCAL, cfs.keyspace.getName(), ranges, cfs.getColumnFamilyName());
++ fail("Should have thrown exception");
++ }
++ catch (RuntimeException e)
++ {
++ //do nothing
++ }
}
private void transfer(SSTableReader sstable, List<Range<Token>> ranges) throws Exception
{
-- new StreamPlan("StreamingTransferTest").transferFiles(LOCAL, makeStreamingDetails(ranges, Refs.tryRef(Arrays.asList(sstable)))).execute().get();
++ StreamPlan streamPlan = new StreamPlan("StreamingTransferTest").transferFiles(LOCAL, makeStreamingDetails(ranges, Refs.tryRef(Arrays.asList(sstable))));
++ streamPlan.execute().get();
verifyConnectionsAreClosed();
++
++ //cannot add files after stream session is finished
++ try
++ {
++ streamPlan.transferFiles(LOCAL, makeStreamingDetails(ranges, Refs.tryRef(Arrays.asList(sstable))));
++ fail("Should have thrown exception");
++ }
++ catch (RuntimeException e)
++ {
++ //do nothing
++ }
}
/**
@@@ -312,36 -325,27 +336,36 @@@
String cfname = "StandardInteger1";
Keyspace keyspace = Keyspace.open(ks);
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfname);
+ ClusteringComparator comparator = cfs.getComparator();
- String key = "key0";
- Mutation rm = new Mutation(ks, ByteBufferUtil.bytes(key));
- // add columns of size slightly less than column_index_size to force insert column index
- rm.add(cfname, cellname(1), ByteBuffer.wrap(new byte[DatabaseDescriptor.getColumnIndexSize() - 64]), 2);
- rm.add(cfname, cellname(6), ByteBuffer.wrap(new byte[DatabaseDescriptor.getColumnIndexSize()]), 2);
- ColumnFamily cf = rm.addOrGet(cfname);
- // add RangeTombstones
- cf.delete(new DeletionInfo(cellname(2), cellname(3), cf.getComparator(), 1, (int) (System.currentTimeMillis() / 1000)));
- cf.delete(new DeletionInfo(cellname(5), cellname(7), cf.getComparator(), 1, (int) (System.currentTimeMillis() / 1000)));
- cf.delete(new DeletionInfo(cellname(8), cellname(10), cf.getComparator(), 1, (int) (System.currentTimeMillis() / 1000)));
- rm.applyUnsafe();
+ String key = "key1";
+
+
+ RowUpdateBuilder updates = new RowUpdateBuilder(cfs.metadata, FBUtilities.timestampMicros(), key);
- key = "key1";
- rm = new Mutation(ks, ByteBufferUtil.bytes(key));
// add columns of size slightly less than column_index_size to force insert column index
- rm.add(cfname, cellname(1), ByteBuffer.wrap(new byte[DatabaseDescriptor.getColumnIndexSize() - 64]), 2);
- cf = rm.addOrGet(cfname);
+ updates.clustering(1)
+ .add("val", ByteBuffer.wrap(new byte[DatabaseDescriptor.getColumnIndexSize() - 64]))
+ .build()
+ .apply();
+
+ updates = new RowUpdateBuilder(cfs.metadata, FBUtilities.timestampMicros(), key);
+ updates.clustering(6)
+ .add("val", ByteBuffer.wrap(new byte[DatabaseDescriptor.getColumnIndexSize()]))
- .build()
++ .build()
+ .apply();
+
// add RangeTombstones
- cf.delete(new DeletionInfo(cellname(2), cellname(3), cf.getComparator(), 1, (int) (System.currentTimeMillis() / 1000)));
- rm.applyUnsafe();
+ //updates = new RowUpdateBuilder(cfs.metadata, FBUtilities.timestampMicros() + 1 , key);
+ //updates.addRangeTombstone(Slice.make(comparator, comparator.make(2), comparator.make(4)))
+ // .build()
+ // .apply();
+
+
+ updates = new RowUpdateBuilder(cfs.metadata, FBUtilities.timestampMicros() + 1, key);
+ updates.addRangeTombstone(Slice.make(comparator.make(5), comparator.make(7)))
+ .build()
+ .apply();
cfs.forceBlockingFlush();
[12/16] cassandra git commit: Merge branch 'cassandra-2.2' into
cassandra-3.0
Posted by yu...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/778f2a46
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/778f2a46
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/778f2a46
Branch: refs/heads/cassandra-3.9
Commit: 778f2a46e2df52aa8451aceaf17046e6b8c86ace
Parents: 9ed3b42 00e7ecf
Author: Yuki Morishita <yu...@apache.org>
Authored: Wed Jul 6 12:33:54 2016 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed Jul 6 12:33:54 2016 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/streaming/ConnectionHandler.java | 8 ++--
.../cassandra/streaming/StreamReceiveTask.java | 50 +++++++++++++++-----
.../cassandra/streaming/StreamSession.java | 17 +++++--
.../streaming/StreamingTransferTest.java | 30 ++++++++++--
5 files changed, 83 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/778f2a46/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 02786c5,7d62f97..8118de1
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,26 -1,14 +1,27 @@@
-2.2.8
+3.0.9
+ * Improve streaming synchronization and fault tolerance (CASSANDRA-11414)
+ * Avoid potential race when rebuilding CFMetaData (CASSANDRA-12098)
+ * Avoid missing sstables when getting the canonical sstables (CASSANDRA-11996)
+ * Always select the live sstables when getting sstables in bounds (CASSANDRA-11944)
+ * Fix column ordering of results with static columns for Thrift requests in
+ a mixed 2.x/3.x cluster, also fix potential non-resolved duplication of
+ those static columns in query results (CASSANDRA-12123)
+ * Avoid digest mismatch with empty but static rows (CASSANDRA-12090)
+ * Fix EOF exception when altering column type (CASSANDRA-11820)
+Merged from 2.2:
* MemoryUtil.getShort() should return an unsigned short also for architectures not supporting unaligned memory accesses (CASSANDRA-11973)
Merged from 2.1:
- * Don't write shadowed range tombstone (CASSANDRA-12030)
- * Improve digest calculation in the presence of overlapping tombstones (CASSANDRA-11349)
* Fix filtering on clustering columns when 2i is used (CASSANDRA-11907)
- * Account for partition deletions in tombstone histogram (CASSANDRA-12112)
-2.2.7
+3.0.8
+ * Fix potential race in schema during new table creation (CASSANDRA-12083)
+ * cqlsh: fix error handling in rare COPY FROM failure scenario (CASSANDRA-12070)
+ * Disable autocompaction during drain (CASSANDRA-11878)
+ * Add a metrics timer to MemtablePool and use it to track time spent blocked on memory in MemtableAllocator (CASSANDRA-11327)
+ * Fix upgrading schema with super columns with non-text subcomparators (CASSANDRA-12023)
+ * Add TimeWindowCompactionStrategy (CASSANDRA-9666)
+Merged from 2.2:
* Allow nodetool info to run with readonly JMX access (CASSANDRA-11755)
* Validate bloom_filter_fp_chance against lowest supported
value when the table is created (CASSANDRA-11920)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/778f2a46/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index 6280f3a,b342edc..040906b
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@@ -17,9 -17,7 +17,6 @@@
*/
package org.apache.cassandra.streaming;
--import java.io.File;
- import java.io.IOError;
- import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
@@@ -36,19 -33,13 +33,20 @@@ import org.apache.cassandra.concurrent.
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.db.view.View;
import org.apache.cassandra.dht.Bounds;
import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.ISSTableScanner;
+import org.apache.cassandra.io.sstable.SSTableMultiWriter;
import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.io.sstable.format.SSTableWriter;
import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.Pair;
-
++import org.apache.cassandra.utils.Throwables;
import org.apache.cassandra.utils.concurrent.Refs;
/**
@@@ -65,16 -55,11 +63,16 @@@ public class StreamReceiveTask extends
// total size of files to receive
private final long totalSize;
+ // Transaction tracking new files received
- public final LifecycleTransaction txn;
++ private final LifecycleTransaction txn;
+
// true if task is done (either completed or aborted)
-- private boolean done = false;
++ private volatile boolean done = false;
// holds references to SSTables received
- protected Collection<SSTableWriter> sstables;
+ protected Collection<SSTableReader> sstables;
+
+ private int remoteSSTablesReceived = 0;
public StreamReceiveTask(StreamSession session, UUID cfId, int totalFiles, long totalSize)
{
@@@ -92,18 -74,16 +90,32 @@@
*
* @param sstable SSTable file received.
*/
- public synchronized void received(SSTableWriter sstable)
+ public synchronized void received(SSTableMultiWriter sstable)
{
if (done)
++ {
++ logger.warn("[{}] Received sstable {} on already finished stream received task. Aborting sstable.", session.planId(),
++ sstable.getFilename());
++ Throwables.maybeFail(sstable.abort(null));
return;
++ }
+
- assert cfId.equals(sstable.metadata.cfId);
+ remoteSSTablesReceived++;
+ assert cfId.equals(sstable.getCfId());
- Collection<SSTableReader> finished = sstable.finish(true);
- sstables.add(sstable);
++ Collection<SSTableReader> finished = null;
++ try
++ {
++ finished = sstable.finish(true);
++ }
++ catch (Throwable t)
++ {
++ Throwables.maybeFail(sstable.abort(t));
++ }
+ txn.update(finished, false);
+ sstables.addAll(finished);
- if (sstables.size() == totalFiles)
+ if (remoteSSTablesReceived == totalFiles)
{
done = true;
executor.submit(new OnCompletionRunnable(this));
@@@ -120,6 -100,6 +132,13 @@@
return totalSize;
}
++ public synchronized LifecycleTransaction getTransaction()
++ {
++ if (done)
++ throw new RuntimeException(String.format("Stream receive task {} of cf {} already finished.", session.planId(), cfId));
++ return txn;
++ }
++
private static class OnCompletionRunnable implements Runnable
{
private final StreamReceiveTask task;
@@@ -139,71 -117,52 +158,71 @@@
if (kscf == null)
{
// schema was dropped during streaming
- for (SSTableWriter writer : task.sstables)
- writer.abort();
task.sstables.clear();
- task.txn.abort();
++ task.abortTransaction();
+ task.session.taskCompleted(task);
return;
}
- ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
+ cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
+ hasViews = !Iterables.isEmpty(View.findAll(kscf.left, kscf.right));
- File lockfiledir = cfs.directories.getWriteableLocationAsFile(task.sstables.size() * 256L);
- StreamLockfile lockfile = new StreamLockfile(lockfiledir, UUID.randomUUID());
- lockfile.create(task.sstables);
- List<SSTableReader> readers = new ArrayList<>();
- for (SSTableWriter writer : task.sstables)
- readers.add(writer.finish(true));
- lockfile.delete();
- task.sstables.clear();
+ Collection<SSTableReader> readers = task.sstables;
try (Refs<SSTableReader> refs = Refs.ref(readers))
{
- // add sstables and build secondary indexes
- cfs.addSSTables(readers);
- cfs.indexManager.maybeBuildSecondaryIndexes(readers, cfs.indexManager.allIndexesNames());
-
- //invalidate row and counter cache
- if (cfs.isRowCacheEnabled() || cfs.metadata.isCounter())
+ //We have a special path for views.
+ //Since the view requires cleaning up any pre-existing state, we must put
+ //all partitions through the same write path as normal mutations.
+ //This also ensures any 2is are also updated
+ if (hasViews)
{
- List<Bounds<Token>> boundsToInvalidate = new ArrayList<>(readers.size());
- for (SSTableReader sstable : readers)
- boundsToInvalidate.add(new Bounds<Token>(sstable.first.getToken(), sstable.last.getToken()));
- Set<Bounds<Token>> nonOverlappingBounds = Bounds.getNonOverlappingBounds(boundsToInvalidate);
-
- if (cfs.isRowCacheEnabled())
+ for (SSTableReader reader : readers)
{
- int invalidatedKeys = cfs.invalidateRowCache(nonOverlappingBounds);
- if (invalidatedKeys > 0)
- logger.debug("[Stream #{}] Invalidated {} row cache entries on table {}.{} after stream " +
- "receive task completed.", task.session.planId(), invalidatedKeys,
- cfs.keyspace.getName(), cfs.getColumnFamilyName());
+ try (ISSTableScanner scanner = reader.getScanner())
+ {
+ while (scanner.hasNext())
+ {
+ try (UnfilteredRowIterator rowIterator = scanner.next())
+ {
+ //Apply unsafe (we will flush below before transaction is done)
+ new Mutation(PartitionUpdate.fromIterator(rowIterator)).applyUnsafe();
+ }
+ }
+ }
}
+ }
+ else
+ {
- task.txn.finish();
++ task.finishTransaction();
- if (cfs.metadata.isCounter())
+ // add sstables and build secondary indexes
+ cfs.addSSTables(readers);
+ cfs.indexManager.buildAllIndexesBlocking(readers);
+
+ //invalidate row and counter cache
+ if (cfs.isRowCacheEnabled() || cfs.metadata.isCounter())
{
- int invalidatedKeys = cfs.invalidateCounterCache(nonOverlappingBounds);
- if (invalidatedKeys > 0)
- logger.debug("[Stream #{}] Invalidated {} counter cache entries on table {}.{} after stream " +
- "receive task completed.", task.session.planId(), invalidatedKeys,
- cfs.keyspace.getName(), cfs.getColumnFamilyName());
+ List<Bounds<Token>> boundsToInvalidate = new ArrayList<>(readers.size());
+ readers.forEach(sstable -> boundsToInvalidate.add(new Bounds<Token>(sstable.first.getToken(), sstable.last.getToken())));
+ Set<Bounds<Token>> nonOverlappingBounds = Bounds.getNonOverlappingBounds(boundsToInvalidate);
+
+ if (cfs.isRowCacheEnabled())
+ {
+ int invalidatedKeys = cfs.invalidateRowCache(nonOverlappingBounds);
+ if (invalidatedKeys > 0)
+ logger.debug("[Stream #{}] Invalidated {} row cache entries on table {}.{} after stream " +
+ "receive task completed.", task.session.planId(), invalidatedKeys,
+ cfs.keyspace.getName(), cfs.getTableName());
+ }
+
+ if (cfs.metadata.isCounter())
+ {
+ int invalidatedKeys = cfs.invalidateCounterCache(nonOverlappingBounds);
+ if (invalidatedKeys > 0)
+ logger.debug("[Stream #{}] Invalidated {} counter cache entries on table {}.{} after stream " +
+ "receive task completed.", task.session.planId(), invalidatedKeys,
+ cfs.keyspace.getName(), cfs.getTableName());
+ }
}
}
}
@@@ -211,21 -171,10 +230,20 @@@
}
catch (Throwable t)
{
-- logger.error("Error applying streamed data: ", t);
JVMStabilityInspector.inspectThrowable(t);
task.session.onError(t);
}
+ finally
+ {
+ //We don't keep the streamed sstables since we've applied them manually
+ //So we abort the txn and delete the streamed sstables
+ if (hasViews)
+ {
+ if (cfs != null)
+ cfs.forceBlockingFlush();
- task.txn.abort();
++ task.abortTransaction();
+ }
+ }
}
}
@@@ -241,7 -190,8 +259,17 @@@
return;
done = true;
- txn.abort();
- for (SSTableWriter writer : sstables)
- writer.abort();
++ abortTransaction();
sstables.clear();
}
++
++ private synchronized void abortTransaction()
++ {
++ txn.abort();
++ }
++
++ private synchronized void finishTransaction()
++ {
++ txn.finish();
++ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/778f2a46/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/StreamSession.java
index bfbedc7,f4c900e..12f561b
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@@ -211,12 -212,6 +211,12 @@@ public class StreamSession implements I
}
+ public LifecycleTransaction getTransaction(UUID cfId)
+ {
+ assert receivers.containsKey(cfId);
- return receivers.get(cfId).txn;
++ return receivers.get(cfId).getTransaction();
+ }
+
/**
* Bind this session to report to specific {@link StreamResultFuture} and
* perform pre-streaming initialization.
@@@ -281,8 -276,8 +281,9 @@@
* @param flushTables flush tables?
* @param repairedAt the time the repair started.
*/
-- public void addTransferRanges(String keyspace, Collection<Range<Token>> ranges, Collection<String> columnFamilies, boolean flushTables, long repairedAt)
++ public synchronized void addTransferRanges(String keyspace, Collection<Range<Token>> ranges, Collection<String> columnFamilies, boolean flushTables, long repairedAt)
{
++ failIfFinished();
Collection<ColumnFamilyStore> stores = getColumnFamilyStores(keyspace, columnFamilies);
if (flushTables)
flushSSTables(stores);
@@@ -300,6 -295,6 +301,12 @@@
}
}
++ private void failIfFinished()
++ {
++ if (state() == State.COMPLETE || state() == State.FAILED)
++ throw new RuntimeException(String.format("Stream %s is finished with state %s", planId(), state().name()));
++ }
++
private Collection<ColumnFamilyStore> getColumnFamilyStores(String keyspace, Collection<String> columnFamilies)
{
Collection<ColumnFamilyStore> stores = new HashSet<>();
@@@ -371,8 -369,8 +378,9 @@@
}
}
-- public void addTransferFiles(Collection<SSTableStreamingSections> sstableDetails)
++ public synchronized void addTransferFiles(Collection<SSTableStreamingSections> sstableDetails)
{
++ failIfFinished();
Iterator<SSTableStreamingSections> iter = sstableDetails.iterator();
while (iter.hasNext())
{
@@@ -745,8 -743,8 +753,9 @@@
FBUtilities.waitOnFutures(flushes);
}
-- private void prepareReceiving(StreamSummary summary)
++ private synchronized void prepareReceiving(StreamSummary summary)
{
++ failIfFinished();
if (summary.files > 0)
receivers.put(summary.cfId, new StreamReceiveTask(this, summary.cfId, summary.files, summary.totalSize));
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/778f2a46/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
index 7223e76,2b16267..6be880c
--- a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
@@@ -229,14 -238,14 +229,38 @@@ public class StreamingTransferTes
List<Range<Token>> ranges = new ArrayList<>();
// wrapped range
ranges.add(new Range<Token>(p.getToken(ByteBufferUtil.bytes("key1")), p.getToken(ByteBufferUtil.bytes("key0"))));
-- new StreamPlan("StreamingTransferTest").transferRanges(LOCAL, cfs.keyspace.getName(), ranges, cfs.getColumnFamilyName()).execute().get();
++ StreamPlan streamPlan = new StreamPlan("StreamingTransferTest").transferRanges(LOCAL, cfs.keyspace.getName(), ranges, cfs.getColumnFamilyName());
++ streamPlan.execute().get();
verifyConnectionsAreClosed();
++
++ //cannot add ranges after stream session is finished
++ try
++ {
++ streamPlan.transferRanges(LOCAL, cfs.keyspace.getName(), ranges, cfs.getColumnFamilyName());
++ fail("Should have thrown exception");
++ }
++ catch (RuntimeException e)
++ {
++ //do nothing
++ }
}
private void transfer(SSTableReader sstable, List<Range<Token>> ranges) throws Exception
{
-- new StreamPlan("StreamingTransferTest").transferFiles(LOCAL, makeStreamingDetails(ranges, Refs.tryRef(Arrays.asList(sstable)))).execute().get();
++ StreamPlan streamPlan = new StreamPlan("StreamingTransferTest").transferFiles(LOCAL, makeStreamingDetails(ranges, Refs.tryRef(Arrays.asList(sstable))));
++ streamPlan.execute().get();
verifyConnectionsAreClosed();
++
++ //cannot add files after stream session is finished
++ try
++ {
++ streamPlan.transferFiles(LOCAL, makeStreamingDetails(ranges, Refs.tryRef(Arrays.asList(sstable))));
++ fail("Should have thrown exception");
++ }
++ catch (RuntimeException e)
++ {
++ //do nothing
++ }
}
/**
@@@ -312,36 -325,27 +336,36 @@@
String cfname = "StandardInteger1";
Keyspace keyspace = Keyspace.open(ks);
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfname);
+ ClusteringComparator comparator = cfs.getComparator();
- String key = "key0";
- Mutation rm = new Mutation(ks, ByteBufferUtil.bytes(key));
- // add columns of size slightly less than column_index_size to force insert column index
- rm.add(cfname, cellname(1), ByteBuffer.wrap(new byte[DatabaseDescriptor.getColumnIndexSize() - 64]), 2);
- rm.add(cfname, cellname(6), ByteBuffer.wrap(new byte[DatabaseDescriptor.getColumnIndexSize()]), 2);
- ColumnFamily cf = rm.addOrGet(cfname);
- // add RangeTombstones
- cf.delete(new DeletionInfo(cellname(2), cellname(3), cf.getComparator(), 1, (int) (System.currentTimeMillis() / 1000)));
- cf.delete(new DeletionInfo(cellname(5), cellname(7), cf.getComparator(), 1, (int) (System.currentTimeMillis() / 1000)));
- cf.delete(new DeletionInfo(cellname(8), cellname(10), cf.getComparator(), 1, (int) (System.currentTimeMillis() / 1000)));
- rm.applyUnsafe();
+ String key = "key1";
+
+
+ RowUpdateBuilder updates = new RowUpdateBuilder(cfs.metadata, FBUtilities.timestampMicros(), key);
- key = "key1";
- rm = new Mutation(ks, ByteBufferUtil.bytes(key));
// add columns of size slightly less than column_index_size to force insert column index
- rm.add(cfname, cellname(1), ByteBuffer.wrap(new byte[DatabaseDescriptor.getColumnIndexSize() - 64]), 2);
- cf = rm.addOrGet(cfname);
+ updates.clustering(1)
+ .add("val", ByteBuffer.wrap(new byte[DatabaseDescriptor.getColumnIndexSize() - 64]))
+ .build()
+ .apply();
+
+ updates = new RowUpdateBuilder(cfs.metadata, FBUtilities.timestampMicros(), key);
+ updates.clustering(6)
+ .add("val", ByteBuffer.wrap(new byte[DatabaseDescriptor.getColumnIndexSize()]))
- .build()
++ .build()
+ .apply();
+
// add RangeTombstones
- cf.delete(new DeletionInfo(cellname(2), cellname(3), cf.getComparator(), 1, (int) (System.currentTimeMillis() / 1000)));
- rm.applyUnsafe();
+ //updates = new RowUpdateBuilder(cfs.metadata, FBUtilities.timestampMicros() + 1 , key);
+ //updates.addRangeTombstone(Slice.make(comparator, comparator.make(2), comparator.make(4)))
+ // .build()
+ // .apply();
+
+
+ updates = new RowUpdateBuilder(cfs.metadata, FBUtilities.timestampMicros() + 1, key);
+ updates.addRangeTombstone(Slice.make(comparator.make(5), comparator.make(7)))
+ .build()
+ .apply();
cfs.forceBlockingFlush();
[07/16] cassandra git commit: Improve streaming synchronization and
fault tolerance
Posted by yu...@apache.org.
Improve streaming synchronization and fault tolerance
Patch by Paulo Motta; Reviewed by yukim for CASSANDRA-11414
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/00e7ecf1
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/00e7ecf1
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/00e7ecf1
Branch: refs/heads/cassandra-3.0
Commit: 00e7ecf1394f8704e2f13369f7950e129459ce2c
Parents: 43c741e
Author: Paulo Motta <pa...@gmail.com>
Authored: Wed Jul 6 12:16:16 2016 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed Jul 6 12:32:39 2016 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/streaming/ConnectionHandler.java | 8 +++-----
.../org/apache/cassandra/streaming/StreamReceiveTask.java | 2 --
3 files changed, 4 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/00e7ecf1/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index bfd8aa2..7d62f97 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.2.8
+ * Improve streaming synchronization and fault tolerance (CASSANDRA-11414)
* MemoryUtil.getShort() should return an unsigned short also for architectures not supporting unaligned memory accesses (CASSANDRA-11973)
Merged from 2.1:
* Don't write shadowed range tombstone (CASSANDRA-12030)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/00e7ecf1/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
index c497a39..364435e 100644
--- a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
+++ b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
@@ -233,6 +233,9 @@ public class ConnectionHandler
protected void signalCloseDone()
{
+ if (closeFuture == null)
+ close();
+
closeFuture.get().set(null);
// We can now close the socket
@@ -294,11 +297,6 @@ public class ConnectionHandler
}
}
}
- catch (SocketException e)
- {
- // socket is closed
- close();
- }
catch (Throwable t)
{
JVMStabilityInspector.inspectThrowable(t);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/00e7ecf1/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index 6911ec6..b342edc 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@ -18,8 +18,6 @@
package org.apache.cassandra.streaming;
import java.io.File;
-import java.io.IOError;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
[14/16] cassandra git commit: Merge branch 'cassandra-3.0' into
cassandra-3.9
Posted by yu...@apache.org.
Merge branch 'cassandra-3.0' into cassandra-3.9
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/59ee46e5
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/59ee46e5
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/59ee46e5
Branch: refs/heads/trunk
Commit: 59ee46e55a15775a49edde86de81b9b79875731d
Parents: 5ad1763 778f2a4
Author: Yuki Morishita <yu...@apache.org>
Authored: Wed Jul 6 12:34:22 2016 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed Jul 6 12:34:22 2016 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/streaming/ConnectionHandler.java | 8 ++--
.../cassandra/streaming/StreamReceiveTask.java | 50 +++++++++++++++-----
.../cassandra/streaming/StreamSession.java | 17 +++++--
.../streaming/StreamingTransferTest.java | 30 ++++++++++--
5 files changed, 83 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59ee46e5/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 2861cf7,8118de1..d459e34
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,7 -1,5 +1,8 @@@
-3.0.9
+3.9
+ * Fix SASI PREFIX search in CONTAINS mode with partial terms (CASSANDRA-12073)
+ * Increase size of flushExecutor thread pool (CASSANDRA-12071)
+Merged from 3.0:
+ * Improve streaming synchronization and fault tolerance (CASSANDRA-11414)
* Avoid potential race when rebuilding CFMetaData (CASSANDRA-12098)
* Avoid missing sstables when getting the canonical sstables (CASSANDRA-11996)
* Always select the live sstables when getting sstables in bounds (CASSANDRA-11944)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59ee46e5/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59ee46e5/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59ee46e5/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
[15/16] cassandra git commit: Merge branch 'cassandra-3.0' into
cassandra-3.9
Posted by yu...@apache.org.
Merge branch 'cassandra-3.0' into cassandra-3.9
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/59ee46e5
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/59ee46e5
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/59ee46e5
Branch: refs/heads/cassandra-3.9
Commit: 59ee46e55a15775a49edde86de81b9b79875731d
Parents: 5ad1763 778f2a4
Author: Yuki Morishita <yu...@apache.org>
Authored: Wed Jul 6 12:34:22 2016 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed Jul 6 12:34:22 2016 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/streaming/ConnectionHandler.java | 8 ++--
.../cassandra/streaming/StreamReceiveTask.java | 50 +++++++++++++++-----
.../cassandra/streaming/StreamSession.java | 17 +++++--
.../streaming/StreamingTransferTest.java | 30 ++++++++++--
5 files changed, 83 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59ee46e5/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 2861cf7,8118de1..d459e34
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,7 -1,5 +1,8 @@@
-3.0.9
+3.9
+ * Fix SASI PREFIX search in CONTAINS mode with partial terms (CASSANDRA-12073)
+ * Increase size of flushExecutor thread pool (CASSANDRA-12071)
+Merged from 3.0:
+ * Improve streaming synchronization and fault tolerance (CASSANDRA-11414)
* Avoid potential race when rebuilding CFMetaData (CASSANDRA-12098)
* Avoid missing sstables when getting the canonical sstables (CASSANDRA-11996)
* Always select the live sstables when getting sstables in bounds (CASSANDRA-11944)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59ee46e5/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59ee46e5/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59ee46e5/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
[16/16] cassandra git commit: Merge branch 'cassandra-3.9' into trunk
Posted by yu...@apache.org.
Merge branch 'cassandra-3.9' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9fd60777
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9fd60777
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9fd60777
Branch: refs/heads/trunk
Commit: 9fd607778091c48910db557d7a95029cac077244
Parents: b4133f3 59ee46e
Author: Yuki Morishita <yu...@apache.org>
Authored: Wed Jul 6 12:34:30 2016 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed Jul 6 12:34:30 2016 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/streaming/ConnectionHandler.java | 8 ++--
.../cassandra/streaming/StreamReceiveTask.java | 50 +++++++++++++++-----
.../cassandra/streaming/StreamSession.java | 17 +++++--
.../streaming/StreamingTransferTest.java | 30 ++++++++++--
5 files changed, 83 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9fd60777/CHANGES.txt
----------------------------------------------------------------------
[02/16] cassandra git commit: Range tombstones that are masked by row
tombstones should not be written out
Posted by yu...@apache.org.
Range tombstones that are masked by row tombstones should not be written out
patch by Nachiket Patil; reviewed by Sylvain Lebresne for CASSANDRA-12030
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/98f5f77b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/98f5f77b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/98f5f77b
Branch: refs/heads/trunk
Commit: 98f5f77bb3c5d50e52cbb6f577a463ca8a5134ad
Parents: 3c1653f
Author: Nachiket Patil <na...@apple.com>
Authored: Wed Jul 6 11:22:56 2016 +0200
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Wed Jul 6 14:35:10 2016 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../db/compaction/LazilyCompactedRow.java | 3 +-
.../apache/cassandra/db/RangeTombstoneTest.java | 40 ++++++++++++++++++++
3 files changed, 43 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/98f5f77b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b1dcbe1..7fa995d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.1.16
+ * Don't write shadowed range tombstone (CASSANDRA-12030)
* Fix filtering on clustering columns when 2i is used (CASSANDRA-11907)
* Reduce contention getting instances of CompositeType (CASSANDRA-10433)
* Improve digest calculation in the presence of overlapping tombstones (CASSANDRA-11349)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/98f5f77b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
index f912da2..dab5eeb 100644
--- a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
@@ -286,7 +286,8 @@ public class LazilyCompactedRow extends AbstractCompactedRow
RangeTombstone t = tombstone;
tombstone = null;
- if (t.data.isGcAble(controller.gcBefore) && t.timestamp() < getMaxPurgeableTimestamp())
+ if (t.data.isGcAble(controller.gcBefore) && t.timestamp() < getMaxPurgeableTimestamp() ||
+ maxRowTombstone.markedForDeleteAt >= t.timestamp())
{
indexBuilder.tombstoneTracker().update(t, true);
return null;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/98f5f77b/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java b/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java
index 3292422..dfd6960 100644
--- a/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java
+++ b/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java
@@ -39,6 +39,7 @@ import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.IndexType;
import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.compaction.LeveledCompactionStrategy;
import org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy;
import org.apache.cassandra.db.composites.CellName;
import org.apache.cassandra.db.composites.CellNames;
@@ -543,6 +544,45 @@ public class RangeTombstoneTest extends SchemaLoader
}
@Test
+ public void testCompactionOfRangeTombstonesCoveredByRowTombstone() throws Exception
+ {
+ long testTimeStamp = 1451606400L; // 01/01/2016 : 00:00:00 GMT
+ Keyspace table = Keyspace.open(KSNAME);
+ ColumnFamilyStore cfs = table.getColumnFamilyStore(CFNAME);
+ ByteBuffer key = ByteBufferUtil.bytes("k4");
+
+ // remove any existing sstables before starting
+ cfs.truncateBlocking();
+ cfs.disableAutoCompaction();
+ cfs.setCompactionStrategyClass(LeveledCompactionStrategy.class.getCanonicalName());
+
+ Mutation rm = new Mutation(KSNAME, key);
+ for (int i = 1; i < 11; i += 2, testTimeStamp += i * 10)
+ add(rm, i, testTimeStamp);
+ rm.apply();
+ cfs.forceBlockingFlush();
+
+ rm = new Mutation(KSNAME, key);
+ ColumnFamily cf = rm.addOrGet(CFNAME);
+
+ // Write the covering row tombstone
+ cf.delete(new DeletionTime(++testTimeStamp, (int) testTimeStamp));
+
+ // Create range tombstones covered by row tombstone above.
+ for (int i = 1; i < 11; i += 2, testTimeStamp -= i * 5)
+ delete(cf, 0, 7, testTimeStamp);
+ rm.apply();
+ cfs.forceBlockingFlush();
+
+ // there should be 2 sstables
+ assertEquals(2, cfs.getSSTables().size());
+
+ // compact down to nothing
+ CompactionManager.instance.performMaximal(cfs);
+ assertEquals(0, cfs.getSSTables().size());
+ }
+
+ @Test
public void testOverwritesToDeletedColumns() throws Exception
{
Keyspace table = Keyspace.open(KSNAME);
[10/16] cassandra git commit: Improve streaming synchronization and
fault tolerance
Posted by yu...@apache.org.
Improve streaming synchronization and fault tolerance
Patch by Paulo Motta; Reviewed by yukim for CASSANDRA-11414
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/00e7ecf1
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/00e7ecf1
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/00e7ecf1
Branch: refs/heads/cassandra-2.2
Commit: 00e7ecf1394f8704e2f13369f7950e129459ce2c
Parents: 43c741e
Author: Paulo Motta <pa...@gmail.com>
Authored: Wed Jul 6 12:16:16 2016 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed Jul 6 12:32:39 2016 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/streaming/ConnectionHandler.java | 8 +++-----
.../org/apache/cassandra/streaming/StreamReceiveTask.java | 2 --
3 files changed, 4 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/00e7ecf1/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index bfd8aa2..7d62f97 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.2.8
+ * Improve streaming synchronization and fault tolerance (CASSANDRA-11414)
* MemoryUtil.getShort() should return an unsigned short also for architectures not supporting unaligned memory accesses (CASSANDRA-11973)
Merged from 2.1:
* Don't write shadowed range tombstone (CASSANDRA-12030)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/00e7ecf1/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
index c497a39..364435e 100644
--- a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
+++ b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
@@ -233,6 +233,9 @@ public class ConnectionHandler
protected void signalCloseDone()
{
+ if (closeFuture == null)
+ close();
+
closeFuture.get().set(null);
// We can now close the socket
@@ -294,11 +297,6 @@ public class ConnectionHandler
}
}
}
- catch (SocketException e)
- {
- // socket is closed
- close();
- }
catch (Throwable t)
{
JVMStabilityInspector.inspectThrowable(t);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/00e7ecf1/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index 6911ec6..b342edc 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@ -18,8 +18,6 @@
package org.apache.cassandra.streaming;
import java.io.File;
-import java.io.IOError;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
[05/16] cassandra git commit: Merge commit
'43c741e251102bf5651ff8aa1b5ca078eb0ddc0b' into cassandra-3.0
Posted by yu...@apache.org.
Merge commit '43c741e251102bf5651ff8aa1b5ca078eb0ddc0b' into cassandra-3.0
* commit '43c741e251102bf5651ff8aa1b5ca078eb0ddc0b':
Range tombstones that are masked by row tombstones should not be written out
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9ed3b42d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9ed3b42d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9ed3b42d
Branch: refs/heads/trunk
Commit: 9ed3b42d3b50237f99485233857a7b34d5238d9a
Parents: dd05e46 43c741e
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Wed Jul 6 14:39:52 2016 +0200
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Wed Jul 6 14:39:52 2016 +0200
----------------------------------------------------------------------
----------------------------------------------------------------------
[06/16] cassandra git commit: Merge commit
'43c741e251102bf5651ff8aa1b5ca078eb0ddc0b' into cassandra-3.0
Posted by yu...@apache.org.
Merge commit '43c741e251102bf5651ff8aa1b5ca078eb0ddc0b' into cassandra-3.0
* commit '43c741e251102bf5651ff8aa1b5ca078eb0ddc0b':
Range tombstones that are masked by row tombstones should not be written out
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9ed3b42d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9ed3b42d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9ed3b42d
Branch: refs/heads/cassandra-3.9
Commit: 9ed3b42d3b50237f99485233857a7b34d5238d9a
Parents: dd05e46 43c741e
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Wed Jul 6 14:39:52 2016 +0200
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Wed Jul 6 14:39:52 2016 +0200
----------------------------------------------------------------------
----------------------------------------------------------------------
[13/16] cassandra git commit: Merge branch 'cassandra-2.2' into
cassandra-3.0
Posted by yu...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/778f2a46
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/778f2a46
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/778f2a46
Branch: refs/heads/cassandra-3.0
Commit: 778f2a46e2df52aa8451aceaf17046e6b8c86ace
Parents: 9ed3b42 00e7ecf
Author: Yuki Morishita <yu...@apache.org>
Authored: Wed Jul 6 12:33:54 2016 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed Jul 6 12:33:54 2016 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/streaming/ConnectionHandler.java | 8 ++--
.../cassandra/streaming/StreamReceiveTask.java | 50 +++++++++++++++-----
.../cassandra/streaming/StreamSession.java | 17 +++++--
.../streaming/StreamingTransferTest.java | 30 ++++++++++--
5 files changed, 83 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/778f2a46/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 02786c5,7d62f97..8118de1
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,26 -1,14 +1,27 @@@
-2.2.8
+3.0.9
+ * Improve streaming synchronization and fault tolerance (CASSANDRA-11414)
+ * Avoid potential race when rebuilding CFMetaData (CASSANDRA-12098)
+ * Avoid missing sstables when getting the canonical sstables (CASSANDRA-11996)
+ * Always select the live sstables when getting sstables in bounds (CASSANDRA-11944)
+ * Fix column ordering of results with static columns for Thrift requests in
+ a mixed 2.x/3.x cluster, also fix potential non-resolved duplication of
+ those static columns in query results (CASSANDRA-12123)
+ * Avoid digest mismatch with empty but static rows (CASSANDRA-12090)
+ * Fix EOF exception when altering column type (CASSANDRA-11820)
+Merged from 2.2:
* MemoryUtil.getShort() should return an unsigned short also for architectures not supporting unaligned memory accesses (CASSANDRA-11973)
Merged from 2.1:
- * Don't write shadowed range tombstone (CASSANDRA-12030)
- * Improve digest calculation in the presence of overlapping tombstones (CASSANDRA-11349)
* Fix filtering on clustering columns when 2i is used (CASSANDRA-11907)
- * Account for partition deletions in tombstone histogram (CASSANDRA-12112)
-2.2.7
+3.0.8
+ * Fix potential race in schema during new table creation (CASSANDRA-12083)
+ * cqlsh: fix error handling in rare COPY FROM failure scenario (CASSANDRA-12070)
+ * Disable autocompaction during drain (CASSANDRA-11878)
+ * Add a metrics timer to MemtablePool and use it to track time spent blocked on memory in MemtableAllocator (CASSANDRA-11327)
+ * Fix upgrading schema with super columns with non-text subcomparators (CASSANDRA-12023)
+ * Add TimeWindowCompactionStrategy (CASSANDRA-9666)
+Merged from 2.2:
* Allow nodetool info to run with readonly JMX access (CASSANDRA-11755)
* Validate bloom_filter_fp_chance against lowest supported
value when the table is created (CASSANDRA-11920)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/778f2a46/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index 6280f3a,b342edc..040906b
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@@ -17,9 -17,7 +17,6 @@@
*/
package org.apache.cassandra.streaming;
--import java.io.File;
- import java.io.IOError;
- import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
@@@ -36,19 -33,13 +33,20 @@@ import org.apache.cassandra.concurrent.
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.db.view.View;
import org.apache.cassandra.dht.Bounds;
import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.ISSTableScanner;
+import org.apache.cassandra.io.sstable.SSTableMultiWriter;
import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.io.sstable.format.SSTableWriter;
import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.Pair;
-
++import org.apache.cassandra.utils.Throwables;
import org.apache.cassandra.utils.concurrent.Refs;
/**
@@@ -65,16 -55,11 +63,16 @@@ public class StreamReceiveTask extends
// total size of files to receive
private final long totalSize;
+ // Transaction tracking new files received
- public final LifecycleTransaction txn;
++ private final LifecycleTransaction txn;
+
// true if task is done (either completed or aborted)
-- private boolean done = false;
++ private volatile boolean done = false;
// holds references to SSTables received
- protected Collection<SSTableWriter> sstables;
+ protected Collection<SSTableReader> sstables;
+
+ private int remoteSSTablesReceived = 0;
public StreamReceiveTask(StreamSession session, UUID cfId, int totalFiles, long totalSize)
{
@@@ -92,18 -74,16 +90,32 @@@
*
* @param sstable SSTable file received.
*/
- public synchronized void received(SSTableWriter sstable)
+ public synchronized void received(SSTableMultiWriter sstable)
{
if (done)
++ {
++ logger.warn("[{}] Received sstable {} on already finished stream received task. Aborting sstable.", session.planId(),
++ sstable.getFilename());
++ Throwables.maybeFail(sstable.abort(null));
return;
++ }
+
- assert cfId.equals(sstable.metadata.cfId);
+ remoteSSTablesReceived++;
+ assert cfId.equals(sstable.getCfId());
- Collection<SSTableReader> finished = sstable.finish(true);
- sstables.add(sstable);
++ Collection<SSTableReader> finished = null;
++ try
++ {
++ finished = sstable.finish(true);
++ }
++ catch (Throwable t)
++ {
++ Throwables.maybeFail(sstable.abort(t));
++ }
+ txn.update(finished, false);
+ sstables.addAll(finished);
- if (sstables.size() == totalFiles)
+ if (remoteSSTablesReceived == totalFiles)
{
done = true;
executor.submit(new OnCompletionRunnable(this));
@@@ -120,6 -100,6 +132,13 @@@
return totalSize;
}
++ public synchronized LifecycleTransaction getTransaction()
++ {
++ if (done)
++ throw new RuntimeException(String.format("Stream receive task {} of cf {} already finished.", session.planId(), cfId));
++ return txn;
++ }
++
private static class OnCompletionRunnable implements Runnable
{
private final StreamReceiveTask task;
@@@ -139,71 -117,52 +158,71 @@@
if (kscf == null)
{
// schema was dropped during streaming
- for (SSTableWriter writer : task.sstables)
- writer.abort();
task.sstables.clear();
- task.txn.abort();
++ task.abortTransaction();
+ task.session.taskCompleted(task);
return;
}
- ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
+ cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
+ hasViews = !Iterables.isEmpty(View.findAll(kscf.left, kscf.right));
- File lockfiledir = cfs.directories.getWriteableLocationAsFile(task.sstables.size() * 256L);
- StreamLockfile lockfile = new StreamLockfile(lockfiledir, UUID.randomUUID());
- lockfile.create(task.sstables);
- List<SSTableReader> readers = new ArrayList<>();
- for (SSTableWriter writer : task.sstables)
- readers.add(writer.finish(true));
- lockfile.delete();
- task.sstables.clear();
+ Collection<SSTableReader> readers = task.sstables;
try (Refs<SSTableReader> refs = Refs.ref(readers))
{
- // add sstables and build secondary indexes
- cfs.addSSTables(readers);
- cfs.indexManager.maybeBuildSecondaryIndexes(readers, cfs.indexManager.allIndexesNames());
-
- //invalidate row and counter cache
- if (cfs.isRowCacheEnabled() || cfs.metadata.isCounter())
+ //We have a special path for views.
+ //Since the view requires cleaning up any pre-existing state, we must put
+ //all partitions through the same write path as normal mutations.
+ //This also ensures any 2is are also updated
+ if (hasViews)
{
- List<Bounds<Token>> boundsToInvalidate = new ArrayList<>(readers.size());
- for (SSTableReader sstable : readers)
- boundsToInvalidate.add(new Bounds<Token>(sstable.first.getToken(), sstable.last.getToken()));
- Set<Bounds<Token>> nonOverlappingBounds = Bounds.getNonOverlappingBounds(boundsToInvalidate);
-
- if (cfs.isRowCacheEnabled())
+ for (SSTableReader reader : readers)
{
- int invalidatedKeys = cfs.invalidateRowCache(nonOverlappingBounds);
- if (invalidatedKeys > 0)
- logger.debug("[Stream #{}] Invalidated {} row cache entries on table {}.{} after stream " +
- "receive task completed.", task.session.planId(), invalidatedKeys,
- cfs.keyspace.getName(), cfs.getColumnFamilyName());
+ try (ISSTableScanner scanner = reader.getScanner())
+ {
+ while (scanner.hasNext())
+ {
+ try (UnfilteredRowIterator rowIterator = scanner.next())
+ {
+ //Apply unsafe (we will flush below before transaction is done)
+ new Mutation(PartitionUpdate.fromIterator(rowIterator)).applyUnsafe();
+ }
+ }
+ }
}
+ }
+ else
+ {
- task.txn.finish();
++ task.finishTransaction();
- if (cfs.metadata.isCounter())
+ // add sstables and build secondary indexes
+ cfs.addSSTables(readers);
+ cfs.indexManager.buildAllIndexesBlocking(readers);
+
+ //invalidate row and counter cache
+ if (cfs.isRowCacheEnabled() || cfs.metadata.isCounter())
{
- int invalidatedKeys = cfs.invalidateCounterCache(nonOverlappingBounds);
- if (invalidatedKeys > 0)
- logger.debug("[Stream #{}] Invalidated {} counter cache entries on table {}.{} after stream " +
- "receive task completed.", task.session.planId(), invalidatedKeys,
- cfs.keyspace.getName(), cfs.getColumnFamilyName());
+ List<Bounds<Token>> boundsToInvalidate = new ArrayList<>(readers.size());
+ readers.forEach(sstable -> boundsToInvalidate.add(new Bounds<Token>(sstable.first.getToken(), sstable.last.getToken())));
+ Set<Bounds<Token>> nonOverlappingBounds = Bounds.getNonOverlappingBounds(boundsToInvalidate);
+
+ if (cfs.isRowCacheEnabled())
+ {
+ int invalidatedKeys = cfs.invalidateRowCache(nonOverlappingBounds);
+ if (invalidatedKeys > 0)
+ logger.debug("[Stream #{}] Invalidated {} row cache entries on table {}.{} after stream " +
+ "receive task completed.", task.session.planId(), invalidatedKeys,
+ cfs.keyspace.getName(), cfs.getTableName());
+ }
+
+ if (cfs.metadata.isCounter())
+ {
+ int invalidatedKeys = cfs.invalidateCounterCache(nonOverlappingBounds);
+ if (invalidatedKeys > 0)
+ logger.debug("[Stream #{}] Invalidated {} counter cache entries on table {}.{} after stream " +
+ "receive task completed.", task.session.planId(), invalidatedKeys,
+ cfs.keyspace.getName(), cfs.getTableName());
+ }
}
}
}
@@@ -211,21 -171,10 +230,20 @@@
}
catch (Throwable t)
{
-- logger.error("Error applying streamed data: ", t);
JVMStabilityInspector.inspectThrowable(t);
task.session.onError(t);
}
+ finally
+ {
+ //We don't keep the streamed sstables since we've applied them manually
+ //So we abort the txn and delete the streamed sstables
+ if (hasViews)
+ {
+ if (cfs != null)
+ cfs.forceBlockingFlush();
- task.txn.abort();
++ task.abortTransaction();
+ }
+ }
}
}
@@@ -241,7 -190,8 +259,17 @@@
return;
done = true;
- txn.abort();
- for (SSTableWriter writer : sstables)
- writer.abort();
++ abortTransaction();
sstables.clear();
}
++
++ private synchronized void abortTransaction()
++ {
++ txn.abort();
++ }
++
++ private synchronized void finishTransaction()
++ {
++ txn.finish();
++ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/778f2a46/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/StreamSession.java
index bfbedc7,f4c900e..12f561b
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@@ -211,12 -212,6 +211,12 @@@ public class StreamSession implements I
}
+ public LifecycleTransaction getTransaction(UUID cfId)
+ {
+ assert receivers.containsKey(cfId);
- return receivers.get(cfId).txn;
++ return receivers.get(cfId).getTransaction();
+ }
+
/**
* Bind this session to report to specific {@link StreamResultFuture} and
* perform pre-streaming initialization.
@@@ -281,8 -276,8 +281,9 @@@
* @param flushTables flush tables?
* @param repairedAt the time the repair started.
*/
-- public void addTransferRanges(String keyspace, Collection<Range<Token>> ranges, Collection<String> columnFamilies, boolean flushTables, long repairedAt)
++ public synchronized void addTransferRanges(String keyspace, Collection<Range<Token>> ranges, Collection<String> columnFamilies, boolean flushTables, long repairedAt)
{
++ failIfFinished();
Collection<ColumnFamilyStore> stores = getColumnFamilyStores(keyspace, columnFamilies);
if (flushTables)
flushSSTables(stores);
@@@ -300,6 -295,6 +301,12 @@@
}
}
++ private void failIfFinished()
++ {
++ if (state() == State.COMPLETE || state() == State.FAILED)
++ throw new RuntimeException(String.format("Stream %s is finished with state %s", planId(), state().name()));
++ }
++
private Collection<ColumnFamilyStore> getColumnFamilyStores(String keyspace, Collection<String> columnFamilies)
{
Collection<ColumnFamilyStore> stores = new HashSet<>();
@@@ -371,8 -369,8 +378,9 @@@
}
}
-- public void addTransferFiles(Collection<SSTableStreamingSections> sstableDetails)
++ public synchronized void addTransferFiles(Collection<SSTableStreamingSections> sstableDetails)
{
++ failIfFinished();
Iterator<SSTableStreamingSections> iter = sstableDetails.iterator();
while (iter.hasNext())
{
@@@ -745,8 -743,8 +753,9 @@@
FBUtilities.waitOnFutures(flushes);
}
-- private void prepareReceiving(StreamSummary summary)
++ private synchronized void prepareReceiving(StreamSummary summary)
{
++ failIfFinished();
if (summary.files > 0)
receivers.put(summary.cfId, new StreamReceiveTask(this, summary.cfId, summary.files, summary.totalSize));
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/778f2a46/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
index 7223e76,2b16267..6be880c
--- a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
@@@ -229,14 -238,14 +229,38 @@@ public class StreamingTransferTes
List<Range<Token>> ranges = new ArrayList<>();
// wrapped range
ranges.add(new Range<Token>(p.getToken(ByteBufferUtil.bytes("key1")), p.getToken(ByteBufferUtil.bytes("key0"))));
-- new StreamPlan("StreamingTransferTest").transferRanges(LOCAL, cfs.keyspace.getName(), ranges, cfs.getColumnFamilyName()).execute().get();
++ StreamPlan streamPlan = new StreamPlan("StreamingTransferTest").transferRanges(LOCAL, cfs.keyspace.getName(), ranges, cfs.getColumnFamilyName());
++ streamPlan.execute().get();
verifyConnectionsAreClosed();
++
++ //cannot add ranges after stream session is finished
++ try
++ {
++ streamPlan.transferRanges(LOCAL, cfs.keyspace.getName(), ranges, cfs.getColumnFamilyName());
++ fail("Should have thrown exception");
++ }
++ catch (RuntimeException e)
++ {
++ //do nothing
++ }
}
private void transfer(SSTableReader sstable, List<Range<Token>> ranges) throws Exception
{
-- new StreamPlan("StreamingTransferTest").transferFiles(LOCAL, makeStreamingDetails(ranges, Refs.tryRef(Arrays.asList(sstable)))).execute().get();
++ StreamPlan streamPlan = new StreamPlan("StreamingTransferTest").transferFiles(LOCAL, makeStreamingDetails(ranges, Refs.tryRef(Arrays.asList(sstable))));
++ streamPlan.execute().get();
verifyConnectionsAreClosed();
++
++ //cannot add files after stream session is finished
++ try
++ {
++ streamPlan.transferFiles(LOCAL, makeStreamingDetails(ranges, Refs.tryRef(Arrays.asList(sstable))));
++ fail("Should have thrown exception");
++ }
++ catch (RuntimeException e)
++ {
++ //do nothing
++ }
}
/**
@@@ -312,36 -325,27 +336,36 @@@
String cfname = "StandardInteger1";
Keyspace keyspace = Keyspace.open(ks);
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfname);
+ ClusteringComparator comparator = cfs.getComparator();
- String key = "key0";
- Mutation rm = new Mutation(ks, ByteBufferUtil.bytes(key));
- // add columns of size slightly less than column_index_size to force insert column index
- rm.add(cfname, cellname(1), ByteBuffer.wrap(new byte[DatabaseDescriptor.getColumnIndexSize() - 64]), 2);
- rm.add(cfname, cellname(6), ByteBuffer.wrap(new byte[DatabaseDescriptor.getColumnIndexSize()]), 2);
- ColumnFamily cf = rm.addOrGet(cfname);
- // add RangeTombstones
- cf.delete(new DeletionInfo(cellname(2), cellname(3), cf.getComparator(), 1, (int) (System.currentTimeMillis() / 1000)));
- cf.delete(new DeletionInfo(cellname(5), cellname(7), cf.getComparator(), 1, (int) (System.currentTimeMillis() / 1000)));
- cf.delete(new DeletionInfo(cellname(8), cellname(10), cf.getComparator(), 1, (int) (System.currentTimeMillis() / 1000)));
- rm.applyUnsafe();
+ String key = "key1";
+
+
+ RowUpdateBuilder updates = new RowUpdateBuilder(cfs.metadata, FBUtilities.timestampMicros(), key);
- key = "key1";
- rm = new Mutation(ks, ByteBufferUtil.bytes(key));
// add columns of size slightly less than column_index_size to force insert column index
- rm.add(cfname, cellname(1), ByteBuffer.wrap(new byte[DatabaseDescriptor.getColumnIndexSize() - 64]), 2);
- cf = rm.addOrGet(cfname);
+ updates.clustering(1)
+ .add("val", ByteBuffer.wrap(new byte[DatabaseDescriptor.getColumnIndexSize() - 64]))
+ .build()
+ .apply();
+
+ updates = new RowUpdateBuilder(cfs.metadata, FBUtilities.timestampMicros(), key);
+ updates.clustering(6)
+ .add("val", ByteBuffer.wrap(new byte[DatabaseDescriptor.getColumnIndexSize()]))
- .build()
++ .build()
+ .apply();
+
// add RangeTombstones
- cf.delete(new DeletionInfo(cellname(2), cellname(3), cf.getComparator(), 1, (int) (System.currentTimeMillis() / 1000)));
- rm.applyUnsafe();
+ //updates = new RowUpdateBuilder(cfs.metadata, FBUtilities.timestampMicros() + 1 , key);
+ //updates.addRangeTombstone(Slice.make(comparator, comparator.make(2), comparator.make(4)))
+ // .build()
+ // .apply();
+
+
+ updates = new RowUpdateBuilder(cfs.metadata, FBUtilities.timestampMicros() + 1, key);
+ updates.addRangeTombstone(Slice.make(comparator.make(5), comparator.make(7)))
+ .build()
+ .apply();
cfs.forceBlockingFlush();
[08/16] cassandra git commit: Improve streaming synchronization and
fault tolerance
Posted by yu...@apache.org.
Improve streaming synchronization and fault tolerance
Patch by Paulo Motta; Reviewed by yukim for CASSANDRA-11414
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/00e7ecf1
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/00e7ecf1
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/00e7ecf1
Branch: refs/heads/trunk
Commit: 00e7ecf1394f8704e2f13369f7950e129459ce2c
Parents: 43c741e
Author: Paulo Motta <pa...@gmail.com>
Authored: Wed Jul 6 12:16:16 2016 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed Jul 6 12:32:39 2016 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/streaming/ConnectionHandler.java | 8 +++-----
.../org/apache/cassandra/streaming/StreamReceiveTask.java | 2 --
3 files changed, 4 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/00e7ecf1/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index bfd8aa2..7d62f97 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.2.8
+ * Improve streaming synchronization and fault tolerance (CASSANDRA-11414)
* MemoryUtil.getShort() should return an unsigned short also for architectures not supporting unaligned memory accesses (CASSANDRA-11973)
Merged from 2.1:
* Don't write shadowed range tombstone (CASSANDRA-12030)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/00e7ecf1/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
index c497a39..364435e 100644
--- a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
+++ b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
@@ -233,6 +233,9 @@ public class ConnectionHandler
protected void signalCloseDone()
{
+ if (closeFuture == null)
+ close();
+
closeFuture.get().set(null);
// We can now close the socket
@@ -294,11 +297,6 @@ public class ConnectionHandler
}
}
}
- catch (SocketException e)
- {
- // socket is closed
- close();
- }
catch (Throwable t)
{
JVMStabilityInspector.inspectThrowable(t);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/00e7ecf1/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index 6911ec6..b342edc 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@ -18,8 +18,6 @@
package org.apache.cassandra.streaming;
import java.io.File;
-import java.io.IOError;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
[03/16] cassandra git commit: Merge branch 'cassandra-2.1' into
cassandra-2.2
Posted by yu...@apache.org.
Merge branch 'cassandra-2.1' into cassandra-2.2
* cassandra-2.1:
Range tombstones that are masked by row tombstones should not be written out
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/43c741e2
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/43c741e2
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/43c741e2
Branch: refs/heads/trunk
Commit: 43c741e251102bf5651ff8aa1b5ca078eb0ddc0b
Parents: d5a15e4 98f5f77
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Wed Jul 6 14:39:13 2016 +0200
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Wed Jul 6 14:39:13 2016 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../db/compaction/LazilyCompactedRow.java | 3 +-
.../apache/cassandra/db/RangeTombstoneTest.java | 40 ++++++++++++++++++++
3 files changed, 43 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/43c741e2/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 65c7c1f,7fa995d..bfd8aa2
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,43 -1,11 +1,44 @@@
-2.1.16
+2.2.8
+ * MemoryUtil.getShort() should return an unsigned short also for architectures not supporting unaligned memory accesses (CASSANDRA-11973)
+Merged from 2.1:
+ * Don't write shadowed range tombstone (CASSANDRA-12030)
- * Fix filtering on clustering columns when 2i is used (CASSANDRA-11907)
- * Reduce contention getting instances of CompositeType (CASSANDRA-10433)
* Improve digest calculation in the presence of overlapping tombstones (CASSANDRA-11349)
-
-2.1.15
+ * Fix filtering on clustering columns when 2i is used (CASSANDRA-11907)
* Account for partition deletions in tombstone histogram (CASSANDRA-12112)
+
+
+2.2.7
+ * Allow nodetool info to run with readonly JMX access (CASSANDRA-11755)
+ * Validate bloom_filter_fp_chance against lowest supported
+ value when the table is created (CASSANDRA-11920)
+ * RandomAccessReader: call isEOF() only when rebuffering, not for every read operation (CASSANDRA-12013)
+ * Don't send erroneous NEW_NODE notifications on restart (CASSANDRA-11038)
+ * StorageService shutdown hook should use a volatile variable (CASSANDRA-11984)
+ * Persist local metadata earlier in startup sequence (CASSANDRA-11742)
+ * Run CommitLog tests with different compression settings (CASSANDRA-9039)
+ * cqlsh: fix tab completion for case-sensitive identifiers (CASSANDRA-11664)
+ * Avoid showing estimated key as -1 in tablestats (CASSANDRA-11587)
+ * Fix possible race condition in CommitLog.recover (CASSANDRA-11743)
+ * Enable client encryption in sstableloader with cli options (CASSANDRA-11708)
+ * Possible memory leak in NIODataInputStream (CASSANDRA-11867)
+ * Fix commit log replay after out-of-order flush completion (CASSANDRA-9669)
+ * Add seconds to cqlsh tracing session duration (CASSANDRA-11753)
+ * Prohibit Reverse Counter type as part of the PK (CASSANDRA-9395)
+ * cqlsh: correctly handle non-ascii chars in error messages (CASSANDRA-11626)
+ * Exit JVM if JMX server fails to startup (CASSANDRA-11540)
+ * Produce a heap dump when exiting on OOM (CASSANDRA-9861)
+ * Avoid read repairing purgeable tombstones on range slices (CASSANDRA-11427)
+ * Restore ability to filter on clustering columns when using a 2i (CASSANDRA-11510)
+ * JSON datetime formatting needs timezone (CASSANDRA-11137)
+ * Fix is_dense recalculation for Thrift-updated tables (CASSANDRA-11502)
+ * Remove unnescessary file existence check during anticompaction (CASSANDRA-11660)
+ * Add missing files to debian packages (CASSANDRA-11642)
+ * Avoid calling Iterables::concat in loops during ModificationStatement::getFunctions (CASSANDRA-11621)
+ * cqlsh: COPY FROM should use regular inserts for single statement batches and
+ report errors correctly if workers processes crash on initialization (CASSANDRA-11474)
+ * Always close cluster with connection in CqlRecordWriter (CASSANDRA-11553)
+ * Fix slice queries on ordered COMPACT tables (CASSANDRA-10988)
+Merged from 2.1:
* Avoid stalling paxos when the paxos state expires (CASSANDRA-12043)
* Remove finished incoming streaming connections from MessagingService (CASSANDRA-11854)
* Don't try to get sstables for non-repairing column families (CASSANDRA-12077)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/43c741e2/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/43c741e2/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/RangeTombstoneTest.java
index 9ce1236,dfd6960..bff0ddf
--- a/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java
+++ b/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java
@@@ -37,10 -33,13 +37,11 @@@ import org.junit.BeforeClass
import org.junit.Test;
import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.*;
import org.apache.cassandra.Util;
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.IndexType;
import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
import org.apache.cassandra.db.compaction.CompactionManager;
+ import org.apache.cassandra.db.compaction.LeveledCompactionStrategy;
import org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy;
import org.apache.cassandra.db.composites.CellName;
import org.apache.cassandra.db.composites.CellNames;
@@@ -559,6 -543,46 +560,45 @@@ public class RangeTombstoneTes
}
@Test
+ public void testCompactionOfRangeTombstonesCoveredByRowTombstone() throws Exception
+ {
+ long testTimeStamp = 1451606400L; // 01/01/2016 : 00:00:00 GMT
+ Keyspace table = Keyspace.open(KSNAME);
+ ColumnFamilyStore cfs = table.getColumnFamilyStore(CFNAME);
+ ByteBuffer key = ByteBufferUtil.bytes("k4");
+
+ // remove any existing sstables before starting
+ cfs.truncateBlocking();
+ cfs.disableAutoCompaction();
+ cfs.setCompactionStrategyClass(LeveledCompactionStrategy.class.getCanonicalName());
+
+ Mutation rm = new Mutation(KSNAME, key);
+ for (int i = 1; i < 11; i += 2, testTimeStamp += i * 10)
+ add(rm, i, testTimeStamp);
+ rm.apply();
+ cfs.forceBlockingFlush();
+
+ rm = new Mutation(KSNAME, key);
+ ColumnFamily cf = rm.addOrGet(CFNAME);
+
+ // Write the covering row tombstone
+ cf.delete(new DeletionTime(++testTimeStamp, (int) testTimeStamp));
+
+ // Create range tombstones covered by row tombstone above.
+ for (int i = 1; i < 11; i += 2, testTimeStamp -= i * 5)
+ delete(cf, 0, 7, testTimeStamp);
+ rm.apply();
+ cfs.forceBlockingFlush();
+
+ // there should be 2 sstables
+ assertEquals(2, cfs.getSSTables().size());
+
+ // compact down to nothing
- CompactionManager.instance.performMaximal(cfs);
++ CompactionManager.instance.performMaximal(cfs, false);
+ assertEquals(0, cfs.getSSTables().size());
+ }
+
+ @Test
public void testOverwritesToDeletedColumns() throws Exception
{
Keyspace table = Keyspace.open(KSNAME);
[04/16] cassandra git commit: Merge branch 'cassandra-2.1' into
cassandra-2.2
Posted by yu...@apache.org.
Merge branch 'cassandra-2.1' into cassandra-2.2
* cassandra-2.1:
Range tombstones that are masked by row tombstones should not be written out
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/43c741e2
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/43c741e2
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/43c741e2
Branch: refs/heads/cassandra-3.9
Commit: 43c741e251102bf5651ff8aa1b5ca078eb0ddc0b
Parents: d5a15e4 98f5f77
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Wed Jul 6 14:39:13 2016 +0200
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Wed Jul 6 14:39:13 2016 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../db/compaction/LazilyCompactedRow.java | 3 +-
.../apache/cassandra/db/RangeTombstoneTest.java | 40 ++++++++++++++++++++
3 files changed, 43 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/43c741e2/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 65c7c1f,7fa995d..bfd8aa2
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,43 -1,11 +1,44 @@@
-2.1.16
+2.2.8
+ * MemoryUtil.getShort() should return an unsigned short also for architectures not supporting unaligned memory accesses (CASSANDRA-11973)
+Merged from 2.1:
+ * Don't write shadowed range tombstone (CASSANDRA-12030)
- * Fix filtering on clustering columns when 2i is used (CASSANDRA-11907)
- * Reduce contention getting instances of CompositeType (CASSANDRA-10433)
* Improve digest calculation in the presence of overlapping tombstones (CASSANDRA-11349)
-
-2.1.15
+ * Fix filtering on clustering columns when 2i is used (CASSANDRA-11907)
* Account for partition deletions in tombstone histogram (CASSANDRA-12112)
+
+
+2.2.7
+ * Allow nodetool info to run with readonly JMX access (CASSANDRA-11755)
+ * Validate bloom_filter_fp_chance against lowest supported
+ value when the table is created (CASSANDRA-11920)
+ * RandomAccessReader: call isEOF() only when rebuffering, not for every read operation (CASSANDRA-12013)
+ * Don't send erroneous NEW_NODE notifications on restart (CASSANDRA-11038)
+ * StorageService shutdown hook should use a volatile variable (CASSANDRA-11984)
+ * Persist local metadata earlier in startup sequence (CASSANDRA-11742)
+ * Run CommitLog tests with different compression settings (CASSANDRA-9039)
+ * cqlsh: fix tab completion for case-sensitive identifiers (CASSANDRA-11664)
+ * Avoid showing estimated key as -1 in tablestats (CASSANDRA-11587)
+ * Fix possible race condition in CommitLog.recover (CASSANDRA-11743)
+ * Enable client encryption in sstableloader with cli options (CASSANDRA-11708)
+ * Possible memory leak in NIODataInputStream (CASSANDRA-11867)
+ * Fix commit log replay after out-of-order flush completion (CASSANDRA-9669)
+ * Add seconds to cqlsh tracing session duration (CASSANDRA-11753)
+ * Prohibit Reverse Counter type as part of the PK (CASSANDRA-9395)
+ * cqlsh: correctly handle non-ascii chars in error messages (CASSANDRA-11626)
+ * Exit JVM if JMX server fails to startup (CASSANDRA-11540)
+ * Produce a heap dump when exiting on OOM (CASSANDRA-9861)
+ * Avoid read repairing purgeable tombstones on range slices (CASSANDRA-11427)
+ * Restore ability to filter on clustering columns when using a 2i (CASSANDRA-11510)
+ * JSON datetime formatting needs timezone (CASSANDRA-11137)
+ * Fix is_dense recalculation for Thrift-updated tables (CASSANDRA-11502)
+ * Remove unnescessary file existence check during anticompaction (CASSANDRA-11660)
+ * Add missing files to debian packages (CASSANDRA-11642)
+ * Avoid calling Iterables::concat in loops during ModificationStatement::getFunctions (CASSANDRA-11621)
+ * cqlsh: COPY FROM should use regular inserts for single statement batches and
+ report errors correctly if workers processes crash on initialization (CASSANDRA-11474)
+ * Always close cluster with connection in CqlRecordWriter (CASSANDRA-11553)
+ * Fix slice queries on ordered COMPACT tables (CASSANDRA-10988)
+Merged from 2.1:
* Avoid stalling paxos when the paxos state expires (CASSANDRA-12043)
* Remove finished incoming streaming connections from MessagingService (CASSANDRA-11854)
* Don't try to get sstables for non-repairing column families (CASSANDRA-12077)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/43c741e2/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/43c741e2/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/RangeTombstoneTest.java
index 9ce1236,dfd6960..bff0ddf
--- a/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java
+++ b/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java
@@@ -37,10 -33,13 +37,11 @@@ import org.junit.BeforeClass
import org.junit.Test;
import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.*;
import org.apache.cassandra.Util;
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.IndexType;
import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
import org.apache.cassandra.db.compaction.CompactionManager;
+ import org.apache.cassandra.db.compaction.LeveledCompactionStrategy;
import org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy;
import org.apache.cassandra.db.composites.CellName;
import org.apache.cassandra.db.composites.CellNames;
@@@ -559,6 -543,46 +560,45 @@@ public class RangeTombstoneTes
}
@Test
+ public void testCompactionOfRangeTombstonesCoveredByRowTombstone() throws Exception
+ {
+ long testTimeStamp = 1451606400L; // 01/01/2016 : 00:00:00 GMT
+ Keyspace table = Keyspace.open(KSNAME);
+ ColumnFamilyStore cfs = table.getColumnFamilyStore(CFNAME);
+ ByteBuffer key = ByteBufferUtil.bytes("k4");
+
+ // remove any existing sstables before starting
+ cfs.truncateBlocking();
+ cfs.disableAutoCompaction();
+ cfs.setCompactionStrategyClass(LeveledCompactionStrategy.class.getCanonicalName());
+
+ Mutation rm = new Mutation(KSNAME, key);
+ for (int i = 1; i < 11; i += 2, testTimeStamp += i * 10)
+ add(rm, i, testTimeStamp);
+ rm.apply();
+ cfs.forceBlockingFlush();
+
+ rm = new Mutation(KSNAME, key);
+ ColumnFamily cf = rm.addOrGet(CFNAME);
+
+ // Write the covering row tombstone
+ cf.delete(new DeletionTime(++testTimeStamp, (int) testTimeStamp));
+
+ // Create range tombstones covered by row tombstone above.
+ for (int i = 1; i < 11; i += 2, testTimeStamp -= i * 5)
+ delete(cf, 0, 7, testTimeStamp);
+ rm.apply();
+ cfs.forceBlockingFlush();
+
+ // there should be 2 sstables
+ assertEquals(2, cfs.getSSTables().size());
+
+ // compact down to nothing
- CompactionManager.instance.performMaximal(cfs);
++ CompactionManager.instance.performMaximal(cfs, false);
+ assertEquals(0, cfs.getSSTables().size());
+ }
+
+ @Test
public void testOverwritesToDeletedColumns() throws Exception
{
Keyspace table = Keyspace.open(KSNAME);