You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2015/12/01 20:08:24 UTC
[01/15] cassandra git commit: Add proper error handling to stream
receiver
Repository: cassandra
Updated Branches:
refs/heads/cassandra-2.1 7650fc196 -> 5ba69a325
refs/heads/cassandra-2.2 1b81ad19d -> 2491ede35
refs/heads/cassandra-3.0 803a3d901 -> ccb20ad46
refs/heads/cassandra-3.1 6bda8868c -> 5b6a368c9
refs/heads/trunk 5daf8d020 -> 03863ed24
Add proper error handling to stream receiver
patch by Paulo Motta; reviewed by yukim for CASSANDRA-10774
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5ba69a32
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5ba69a32
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5ba69a32
Branch: refs/heads/cassandra-2.1
Commit: 5ba69a32590074610f5516a20b8198416b79dfcf
Parents: 7650fc1
Author: Paulo Motta <pa...@gmail.com>
Authored: Fri Nov 27 16:37:37 2015 -0800
Committer: Yuki Morishita <yu...@apache.org>
Committed: Tue Dec 1 11:53:35 2015 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/streaming/StreamReceiveTask.java | 105 ++++++++++---------
2 files changed, 59 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5ba69a32/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a2f7b6e..3ce2da6 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.1.12
+ * Add proper error handling to stream receiver (CASSANDRA-10774)
* Warn or fail when changing cluster topology live (CASSANDRA-10243)
* Status command in debian/ubuntu init script doesn't work (CASSANDRA-10213)
* Some DROP ... IF EXISTS incorrectly result in exceptions on non-existing KS (CASSANDRA-10658)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5ba69a32/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index 738c93c..8773cab 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@ -40,6 +40,7 @@ import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.io.sstable.SSTableWriter;
import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.concurrent.Refs;
@@ -116,63 +117,73 @@ public class StreamReceiveTask extends StreamTask
public void run()
{
- Pair<String, String> kscf = Schema.instance.getCF(task.cfId);
- if (kscf == null)
+ try
{
- // schema was dropped during streaming
+ Pair<String, String> kscf = Schema.instance.getCF(task.cfId);
+ if (kscf == null)
+ {
+ // schema was dropped during streaming
+ for (SSTableWriter writer : task.sstables)
+ writer.abort();
+ task.sstables.clear();
+ task.session.taskCompleted(task);
+ return;
+ }
+ ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
+
+ File lockfiledir = cfs.directories.getWriteableLocationAsFile(task.sstables.size() * 256L);
+ if (lockfiledir == null)
+ throw new IOError(new IOException("All disks full"));
+ StreamLockfile lockfile = new StreamLockfile(lockfiledir, UUID.randomUUID());
+ lockfile.create(task.sstables);
+ List<SSTableReader> readers = new ArrayList<>();
for (SSTableWriter writer : task.sstables)
- writer.abort();
+ readers.add(writer.closeAndOpenReader());
+ lockfile.delete();
task.sstables.clear();
- return;
- }
- ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
-
- File lockfiledir = cfs.directories.getWriteableLocationAsFile(task.sstables.size() * 256L);
- if (lockfiledir == null)
- throw new IOError(new IOException("All disks full"));
- StreamLockfile lockfile = new StreamLockfile(lockfiledir, UUID.randomUUID());
- lockfile.create(task.sstables);
- List<SSTableReader> readers = new ArrayList<>();
- for (SSTableWriter writer : task.sstables)
- readers.add(writer.closeAndOpenReader());
- lockfile.delete();
- task.sstables.clear();
-
- try (Refs<SSTableReader> refs = Refs.ref(readers))
- {
- // add sstables and build secondary indexes
- cfs.addSSTables(readers);
- cfs.indexManager.maybeBuildSecondaryIndexes(readers, cfs.indexManager.allIndexesNames());
- //invalidate row and counter cache
- if (cfs.isRowCacheEnabled() || cfs.metadata.isCounter())
+ try (Refs<SSTableReader> refs = Refs.ref(readers))
{
- List<Bounds<Token>> boundsToInvalidate = new ArrayList<>(readers.size());
- for (SSTableReader sstable : readers)
- boundsToInvalidate.add(new Bounds<Token>(sstable.first.getToken(), sstable.last.getToken()));
- Set<Bounds<Token>> nonOverlappingBounds = Bounds.getNonOverlappingBounds(boundsToInvalidate);
+ // add sstables and build secondary indexes
+ cfs.addSSTables(readers);
+ cfs.indexManager.maybeBuildSecondaryIndexes(readers, cfs.indexManager.allIndexesNames());
- if (cfs.isRowCacheEnabled())
+ //invalidate row and counter cache
+ if (cfs.isRowCacheEnabled() || cfs.metadata.isCounter())
{
- int invalidatedKeys = cfs.invalidateRowCache(nonOverlappingBounds);
- if (invalidatedKeys > 0)
- logger.debug("[Stream #{}] Invalidated {} row cache entries on table {}.{} after stream " +
- "receive task completed.", task.session.planId(), invalidatedKeys,
- cfs.keyspace.getName(), cfs.getColumnFamilyName());
- }
-
- if (cfs.metadata.isCounter())
- {
- int invalidatedKeys = cfs.invalidateCounterCache(nonOverlappingBounds);
- if (invalidatedKeys > 0)
- logger.debug("[Stream #{}] Invalidated {} counter cache entries on table {}.{} after stream " +
- "receive task completed.", task.session.planId(), invalidatedKeys,
- cfs.keyspace.getName(), cfs.getColumnFamilyName());
+ List<Bounds<Token>> boundsToInvalidate = new ArrayList<>(readers.size());
+ for (SSTableReader sstable : readers)
+ boundsToInvalidate.add(new Bounds<Token>(sstable.first.getToken(), sstable.last.getToken()));
+ Set<Bounds<Token>> nonOverlappingBounds = Bounds.getNonOverlappingBounds(boundsToInvalidate);
+
+ if (cfs.isRowCacheEnabled())
+ {
+ int invalidatedKeys = cfs.invalidateRowCache(nonOverlappingBounds);
+ if (invalidatedKeys > 0)
+ logger.debug("[Stream #{}] Invalidated {} row cache entries on table {}.{} after stream " +
+ "receive task completed.", task.session.planId(), invalidatedKeys,
+ cfs.keyspace.getName(), cfs.getColumnFamilyName());
+ }
+
+ if (cfs.metadata.isCounter())
+ {
+ int invalidatedKeys = cfs.invalidateCounterCache(nonOverlappingBounds);
+ if (invalidatedKeys > 0)
+ logger.debug("[Stream #{}] Invalidated {} counter cache entries on table {}.{} after stream " +
+ "receive task completed.", task.session.planId(), invalidatedKeys,
+ cfs.keyspace.getName(), cfs.getColumnFamilyName());
+ }
}
}
- }
- task.session.taskCompleted(task);
+ task.session.taskCompleted(task);
+ }
+ catch (Throwable t)
+ {
+ logger.error("Error applying streamed data: ", t);
+ JVMStabilityInspector.inspectThrowable(t);
+ task.session.onError(t);
+ }
}
}
[11/15] cassandra git commit: Merge branch 'cassandra-2.2' into
cassandra-3.0
Posted by yu...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ccb20ad4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ccb20ad4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ccb20ad4
Branch: refs/heads/cassandra-3.1
Commit: ccb20ad46ab38961aac39cc8634f450046bdf16b
Parents: 803a3d9 2491ede
Author: Yuki Morishita <yu...@apache.org>
Authored: Tue Dec 1 13:07:39 2015 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Tue Dec 1 13:07:39 2015 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/streaming/StreamReceiveTask.java | 60 ++++++++++----------
2 files changed, 31 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ccb20ad4/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 7fffbbf,7541212..a01011b
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -23,7 -8,16 +23,8 @@@ Merged from 2.2
* Fix SimpleDateType type compatibility (CASSANDRA-10027)
* (Hadoop) fix splits calculation (CASSANDRA-10640)
* (Hadoop) ensure that Cluster instances are always closed (CASSANDRA-10058)
- * (cqlsh) show partial trace if incomplete after max_trace_wait (CASSANDRA-7645)
- * Use most up-to-date version of schema for system tables (CASSANDRA-10652)
- * Deprecate memory_allocator in cassandra.yaml (CASSANDRA-10581,10628)
- * Expose phi values from failure detector via JMX and tweak debug
- and trace logging (CASSANDRA-9526)
- * Fix RangeNamesQueryPager (CASSANDRA-10509)
- * Deprecate Pig support (CASSANDRA-10542)
- * Reduce contention getting instances of CompositeType (CASSANDRA-10433)
Merged from 2.1:
+ * Add proper error handling to stream receiver (CASSANDRA-10774)
* Warn or fail when changing cluster topology live (CASSANDRA-10243)
* Status command in debian/ubuntu init script doesn't work (CASSANDRA-10213)
* Some DROP ... IF EXISTS incorrectly result in exceptions on non-existing KS (CASSANDRA-10658)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ccb20ad4/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index 54ce711,dd56b8b..dfc91f9
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@@ -126,110 -113,73 +126,110 @@@ public class StreamReceiveTask extends
public void run()
{
- Pair<String, String> kscf = Schema.instance.getCF(task.cfId);
- if (kscf == null)
- {
- // schema was dropped during streaming
- task.sstables.forEach(SSTableMultiWriter::abortOrDie);
-
- task.sstables.clear();
- task.txn.abort();
- return;
- }
- ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
- boolean hasViews = !Iterables.isEmpty(View.findAll(kscf.left, kscf.right));
-
++ boolean hasViews = false;
++ ColumnFamilyStore cfs = null;
try
{
+ Pair<String, String> kscf = Schema.instance.getCF(task.cfId);
+ if (kscf == null)
+ {
+ // schema was dropped during streaming
- for (SSTableWriter writer : task.sstables)
- writer.abort();
++ task.sstables.forEach(SSTableMultiWriter::abortOrDie);
+ task.sstables.clear();
++ task.txn.abort();
+ task.session.taskCompleted(task);
+ return;
+ }
- ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
++ cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
++ hasViews = !Iterables.isEmpty(View.findAll(kscf.left, kscf.right));
+
- File lockfiledir = cfs.directories.getWriteableLocationAsFile(task.sstables.size() * 256L);
- if (lockfiledir == null)
- throw new IOError(new IOException("All disks full"));
- StreamLockfile lockfile = new StreamLockfile(lockfiledir, UUID.randomUUID());
- lockfile.create(task.sstables);
List<SSTableReader> readers = new ArrayList<>();
- for (SSTableWriter writer : task.sstables)
- readers.add(writer.finish(true));
- lockfile.delete();
+ for (SSTableMultiWriter writer : task.sstables)
+ {
+ Collection<SSTableReader> newReaders = writer.finish(true);
+ readers.addAll(newReaders);
+ task.txn.update(newReaders, false);
+ }
+
task.sstables.clear();
try (Refs<SSTableReader> refs = Refs.ref(readers))
{
- // add sstables and build secondary indexes
- cfs.addSSTables(readers);
- cfs.indexManager.maybeBuildSecondaryIndexes(readers, cfs.indexManager.allIndexesNames());
-
- //invalidate row and counter cache
- if (cfs.isRowCacheEnabled() || cfs.metadata.isCounter())
+ //We have a special path for views.
+ //Since the view requires cleaning up any pre-existing state, we must put
+ //all partitions through the same write path as normal mutations.
+ //This also ensures any 2is are also updated
+ if (hasViews)
{
- List<Bounds<Token>> boundsToInvalidate = new ArrayList<>(readers.size());
- for (SSTableReader sstable : readers)
- boundsToInvalidate.add(new Bounds<Token>(sstable.first.getToken(), sstable.last.getToken()));
- Set<Bounds<Token>> nonOverlappingBounds = Bounds.getNonOverlappingBounds(boundsToInvalidate);
-
- if (cfs.isRowCacheEnabled())
+ for (SSTableReader reader : readers)
{
- int invalidatedKeys = cfs.invalidateRowCache(nonOverlappingBounds);
- if (invalidatedKeys > 0)
- logger.debug("[Stream #{}] Invalidated {} row cache entries on table {}.{} after stream " +
- "receive task completed.", task.session.planId(), invalidatedKeys,
- cfs.keyspace.getName(), cfs.getColumnFamilyName());
+ try (ISSTableScanner scanner = reader.getScanner())
+ {
+ while (scanner.hasNext())
+ {
+ try (UnfilteredRowIterator rowIterator = scanner.next())
+ {
+ //Apply unsafe (we will flush below before transaction is done)
+ new Mutation(PartitionUpdate.fromIterator(rowIterator)).applyUnsafe();
+ }
+ }
+ }
}
+ }
+ else
+ {
+ task.txn.finish();
- if (cfs.metadata.isCounter())
+ // add sstables and build secondary indexes
+ cfs.addSSTables(readers);
+ cfs.indexManager.buildAllIndexesBlocking(readers);
+
+ //invalidate row and counter cache
+ if (cfs.isRowCacheEnabled() || cfs.metadata.isCounter())
{
- int invalidatedKeys = cfs.invalidateCounterCache(nonOverlappingBounds);
- if (invalidatedKeys > 0)
- logger.debug("[Stream #{}] Invalidated {} counter cache entries on table {}.{} after stream " +
- "receive task completed.", task.session.planId(), invalidatedKeys,
- cfs.keyspace.getName(), cfs.getColumnFamilyName());
+ List<Bounds<Token>> boundsToInvalidate = new ArrayList<>(readers.size());
+ readers.forEach(sstable -> boundsToInvalidate.add(new Bounds<Token>(sstable.first.getToken(), sstable.last.getToken())));
+ Set<Bounds<Token>> nonOverlappingBounds = Bounds.getNonOverlappingBounds(boundsToInvalidate);
+
+ if (cfs.isRowCacheEnabled())
+ {
+ int invalidatedKeys = cfs.invalidateRowCache(nonOverlappingBounds);
+ if (invalidatedKeys > 0)
+ logger.debug("[Stream #{}] Invalidated {} row cache entries on table {}.{} after stream " +
+ "receive task completed.", task.session.planId(), invalidatedKeys,
+ cfs.keyspace.getName(), cfs.getTableName());
+ }
+
+ if (cfs.metadata.isCounter())
+ {
+ int invalidatedKeys = cfs.invalidateCounterCache(nonOverlappingBounds);
+ if (invalidatedKeys > 0)
+ logger.debug("[Stream #{}] Invalidated {} counter cache entries on table {}.{} after stream " +
+ "receive task completed.", task.session.planId(), invalidatedKeys,
+ cfs.keyspace.getName(), cfs.getTableName());
+ }
}
- }
- }
- catch (Throwable t)
- {
- logger.error("Error applying streamed sstable: ", t);
-
- JVMStabilityInspector.inspectThrowable(t);
- }
- finally
- {
- //We don't keep the streamed sstables since we've applied them manually
- //So we abort the txn and delete the streamed sstables
- if (hasViews)
- {
- cfs.forceBlockingFlush();
- task.txn.abort();
++ task.session.taskCompleted(task);
}
}
-
- task.session.taskCompleted(task);
}
+ catch (Throwable t)
+ {
+ logger.error("Error applying streamed data: ", t);
+ JVMStabilityInspector.inspectThrowable(t);
+ task.session.onError(t);
+ }
+ finally
+ {
- task.session.taskCompleted(task);
++ //We don't keep the streamed sstables since we've applied them manually
++ //So we abort the txn and delete the streamed sstables
++ if (hasViews)
++ {
++ if (cfs != null)
++ cfs.forceBlockingFlush();
++ task.txn.abort();
++ }
+ }
}
}
[06/15] cassandra git commit: Merge branch 'cassandra-2.1' into
cassandra-2.2
Posted by yu...@apache.org.
Merge branch 'cassandra-2.1' into cassandra-2.2
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2491ede3
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2491ede3
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2491ede3
Branch: refs/heads/trunk
Commit: 2491ede3515f4b774069ffd645b0fb18f9c73630
Parents: 1b81ad1 5ba69a3
Author: Yuki Morishita <yu...@apache.org>
Authored: Tue Dec 1 13:05:36 2015 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Tue Dec 1 13:05:36 2015 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/streaming/StreamReceiveTask.java | 105 ++++++++++---------
2 files changed, 59 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2491ede3/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index af1a186,3ce2da6..7541212
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,22 -1,5 +1,23 @@@
-2.1.12
+2.2.4
+ * Show CQL help in cqlsh in web browser (CASSANDRA-7225)
+ * Serialize on disk the proper SSTable compression ratio (CASSANDRA-10775)
+ * Reject index queries while the index is building (CASSANDRA-8505)
+ * CQL.textile syntax incorrectly includes optional keyspace for aggregate SFUNC and FINALFUNC (CASSANDRA-10747)
+ * Fix JSON update with prepared statements (CASSANDRA-10631)
+ * Don't do anticompaction after subrange repair (CASSANDRA-10422)
+ * Fix SimpleDateType type compatibility (CASSANDRA-10027)
+ * (Hadoop) fix splits calculation (CASSANDRA-10640)
+ * (Hadoop) ensure that Cluster instances are always closed (CASSANDRA-10058)
+ * (cqlsh) show partial trace if incomplete after max_trace_wait (CASSANDRA-7645)
+ * Use most up-to-date version of schema for system tables (CASSANDRA-10652)
+ * Deprecate memory_allocator in cassandra.yaml (CASSANDRA-10581,10628)
+ * Expose phi values from failure detector via JMX and tweak debug
+ and trace logging (CASSANDRA-9526)
+ * Fix RangeNamesQueryPager (CASSANDRA-10509)
+ * Deprecate Pig support (CASSANDRA-10542)
+ * Reduce contention getting instances of CompositeType (CASSANDRA-10433)
+Merged from 2.1:
+ * Add proper error handling to stream receiver (CASSANDRA-10774)
* Warn or fail when changing cluster topology live (CASSANDRA-10243)
* Status command in debian/ubuntu init script doesn't work (CASSANDRA-10213)
* Some DROP ... IF EXISTS incorrectly result in exceptions on non-existing KS (CASSANDRA-10658)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2491ede3/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index 846524b,8773cab..dd56b8b
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@@ -37,8 -37,10 +37,9 @@@ import org.apache.cassandra.db.ColumnFa
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.dht.Bounds;
import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.io.sstable.SSTableReader;
-import org.apache.cassandra.io.sstable.SSTableWriter;
-import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.format.SSTableWriter;
+ import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.concurrent.Refs;
@@@ -112,63 -117,73 +113,73 @@@ public class StreamReceiveTask extends
public void run()
{
- Pair<String, String> kscf = Schema.instance.getCF(task.cfId);
- if (kscf == null)
+ try
{
- // schema was dropped during streaming
+ Pair<String, String> kscf = Schema.instance.getCF(task.cfId);
+ if (kscf == null)
+ {
+ // schema was dropped during streaming
+ for (SSTableWriter writer : task.sstables)
+ writer.abort();
+ task.sstables.clear();
+ task.session.taskCompleted(task);
+ return;
+ }
+ ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
+
+ File lockfiledir = cfs.directories.getWriteableLocationAsFile(task.sstables.size() * 256L);
+ if (lockfiledir == null)
+ throw new IOError(new IOException("All disks full"));
+ StreamLockfile lockfile = new StreamLockfile(lockfiledir, UUID.randomUUID());
+ lockfile.create(task.sstables);
+ List<SSTableReader> readers = new ArrayList<>();
for (SSTableWriter writer : task.sstables)
- writer.abort();
- readers.add(writer.closeAndOpenReader());
++ readers.add(writer.finish(true));
+ lockfile.delete();
task.sstables.clear();
- return;
- }
- ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
-
- File lockfiledir = cfs.directories.getWriteableLocationAsFile(task.sstables.size() * 256L);
- if (lockfiledir == null)
- throw new IOError(new IOException("All disks full"));
- StreamLockfile lockfile = new StreamLockfile(lockfiledir, UUID.randomUUID());
- lockfile.create(task.sstables);
- List<SSTableReader> readers = new ArrayList<>();
- for (SSTableWriter writer : task.sstables)
- readers.add(writer.finish(true));
- lockfile.delete();
- task.sstables.clear();
-
- try (Refs<SSTableReader> refs = Refs.ref(readers))
- {
- // add sstables and build secondary indexes
- cfs.addSSTables(readers);
- cfs.indexManager.maybeBuildSecondaryIndexes(readers, cfs.indexManager.allIndexesNames());
- //invalidate row and counter cache
- if (cfs.isRowCacheEnabled() || cfs.metadata.isCounter())
+ try (Refs<SSTableReader> refs = Refs.ref(readers))
{
- List<Bounds<Token>> boundsToInvalidate = new ArrayList<>(readers.size());
- for (SSTableReader sstable : readers)
- boundsToInvalidate.add(new Bounds<Token>(sstable.first.getToken(), sstable.last.getToken()));
- Set<Bounds<Token>> nonOverlappingBounds = Bounds.getNonOverlappingBounds(boundsToInvalidate);
+ // add sstables and build secondary indexes
+ cfs.addSSTables(readers);
+ cfs.indexManager.maybeBuildSecondaryIndexes(readers, cfs.indexManager.allIndexesNames());
- if (cfs.isRowCacheEnabled())
+ //invalidate row and counter cache
+ if (cfs.isRowCacheEnabled() || cfs.metadata.isCounter())
{
- int invalidatedKeys = cfs.invalidateRowCache(nonOverlappingBounds);
- if (invalidatedKeys > 0)
- logger.debug("[Stream #{}] Invalidated {} row cache entries on table {}.{} after stream " +
- "receive task completed.", task.session.planId(), invalidatedKeys,
- cfs.keyspace.getName(), cfs.getColumnFamilyName());
- }
-
- if (cfs.metadata.isCounter())
- {
- int invalidatedKeys = cfs.invalidateCounterCache(nonOverlappingBounds);
- if (invalidatedKeys > 0)
- logger.debug("[Stream #{}] Invalidated {} counter cache entries on table {}.{} after stream " +
- "receive task completed.", task.session.planId(), invalidatedKeys,
- cfs.keyspace.getName(), cfs.getColumnFamilyName());
+ List<Bounds<Token>> boundsToInvalidate = new ArrayList<>(readers.size());
+ for (SSTableReader sstable : readers)
+ boundsToInvalidate.add(new Bounds<Token>(sstable.first.getToken(), sstable.last.getToken()));
+ Set<Bounds<Token>> nonOverlappingBounds = Bounds.getNonOverlappingBounds(boundsToInvalidate);
+
+ if (cfs.isRowCacheEnabled())
+ {
+ int invalidatedKeys = cfs.invalidateRowCache(nonOverlappingBounds);
+ if (invalidatedKeys > 0)
+ logger.debug("[Stream #{}] Invalidated {} row cache entries on table {}.{} after stream " +
+ "receive task completed.", task.session.planId(), invalidatedKeys,
+ cfs.keyspace.getName(), cfs.getColumnFamilyName());
+ }
+
+ if (cfs.metadata.isCounter())
+ {
+ int invalidatedKeys = cfs.invalidateCounterCache(nonOverlappingBounds);
+ if (invalidatedKeys > 0)
+ logger.debug("[Stream #{}] Invalidated {} counter cache entries on table {}.{} after stream " +
+ "receive task completed.", task.session.planId(), invalidatedKeys,
+ cfs.keyspace.getName(), cfs.getColumnFamilyName());
+ }
}
}
- }
- task.session.taskCompleted(task);
+ task.session.taskCompleted(task);
+ }
+ catch (Throwable t)
+ {
+ logger.error("Error applying streamed data: ", t);
+ JVMStabilityInspector.inspectThrowable(t);
+ task.session.onError(t);
+ }
}
}
[08/15] cassandra git commit: Merge branch 'cassandra-2.1' into
cassandra-2.2
Posted by yu...@apache.org.
Merge branch 'cassandra-2.1' into cassandra-2.2
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2491ede3
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2491ede3
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2491ede3
Branch: refs/heads/cassandra-3.0
Commit: 2491ede3515f4b774069ffd645b0fb18f9c73630
Parents: 1b81ad1 5ba69a3
Author: Yuki Morishita <yu...@apache.org>
Authored: Tue Dec 1 13:05:36 2015 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Tue Dec 1 13:05:36 2015 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/streaming/StreamReceiveTask.java | 105 ++++++++++---------
2 files changed, 59 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2491ede3/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index af1a186,3ce2da6..7541212
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,22 -1,5 +1,23 @@@
-2.1.12
+2.2.4
+ * Show CQL help in cqlsh in web browser (CASSANDRA-7225)
+ * Serialize on disk the proper SSTable compression ratio (CASSANDRA-10775)
+ * Reject index queries while the index is building (CASSANDRA-8505)
+ * CQL.textile syntax incorrectly includes optional keyspace for aggregate SFUNC and FINALFUNC (CASSANDRA-10747)
+ * Fix JSON update with prepared statements (CASSANDRA-10631)
+ * Don't do anticompaction after subrange repair (CASSANDRA-10422)
+ * Fix SimpleDateType type compatibility (CASSANDRA-10027)
+ * (Hadoop) fix splits calculation (CASSANDRA-10640)
+ * (Hadoop) ensure that Cluster instances are always closed (CASSANDRA-10058)
+ * (cqlsh) show partial trace if incomplete after max_trace_wait (CASSANDRA-7645)
+ * Use most up-to-date version of schema for system tables (CASSANDRA-10652)
+ * Deprecate memory_allocator in cassandra.yaml (CASSANDRA-10581,10628)
+ * Expose phi values from failure detector via JMX and tweak debug
+ and trace logging (CASSANDRA-9526)
+ * Fix RangeNamesQueryPager (CASSANDRA-10509)
+ * Deprecate Pig support (CASSANDRA-10542)
+ * Reduce contention getting instances of CompositeType (CASSANDRA-10433)
+Merged from 2.1:
+ * Add proper error handling to stream receiver (CASSANDRA-10774)
* Warn or fail when changing cluster topology live (CASSANDRA-10243)
* Status command in debian/ubuntu init script doesn't work (CASSANDRA-10213)
* Some DROP ... IF EXISTS incorrectly result in exceptions on non-existing KS (CASSANDRA-10658)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2491ede3/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index 846524b,8773cab..dd56b8b
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@@ -37,8 -37,10 +37,9 @@@ import org.apache.cassandra.db.ColumnFa
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.dht.Bounds;
import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.io.sstable.SSTableReader;
-import org.apache.cassandra.io.sstable.SSTableWriter;
-import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.format.SSTableWriter;
+ import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.concurrent.Refs;
@@@ -112,63 -117,73 +113,73 @@@ public class StreamReceiveTask extends
public void run()
{
- Pair<String, String> kscf = Schema.instance.getCF(task.cfId);
- if (kscf == null)
+ try
{
- // schema was dropped during streaming
+ Pair<String, String> kscf = Schema.instance.getCF(task.cfId);
+ if (kscf == null)
+ {
+ // schema was dropped during streaming
+ for (SSTableWriter writer : task.sstables)
+ writer.abort();
+ task.sstables.clear();
+ task.session.taskCompleted(task);
+ return;
+ }
+ ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
+
+ File lockfiledir = cfs.directories.getWriteableLocationAsFile(task.sstables.size() * 256L);
+ if (lockfiledir == null)
+ throw new IOError(new IOException("All disks full"));
+ StreamLockfile lockfile = new StreamLockfile(lockfiledir, UUID.randomUUID());
+ lockfile.create(task.sstables);
+ List<SSTableReader> readers = new ArrayList<>();
for (SSTableWriter writer : task.sstables)
- writer.abort();
- readers.add(writer.closeAndOpenReader());
++ readers.add(writer.finish(true));
+ lockfile.delete();
task.sstables.clear();
- return;
- }
- ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
-
- File lockfiledir = cfs.directories.getWriteableLocationAsFile(task.sstables.size() * 256L);
- if (lockfiledir == null)
- throw new IOError(new IOException("All disks full"));
- StreamLockfile lockfile = new StreamLockfile(lockfiledir, UUID.randomUUID());
- lockfile.create(task.sstables);
- List<SSTableReader> readers = new ArrayList<>();
- for (SSTableWriter writer : task.sstables)
- readers.add(writer.finish(true));
- lockfile.delete();
- task.sstables.clear();
-
- try (Refs<SSTableReader> refs = Refs.ref(readers))
- {
- // add sstables and build secondary indexes
- cfs.addSSTables(readers);
- cfs.indexManager.maybeBuildSecondaryIndexes(readers, cfs.indexManager.allIndexesNames());
- //invalidate row and counter cache
- if (cfs.isRowCacheEnabled() || cfs.metadata.isCounter())
+ try (Refs<SSTableReader> refs = Refs.ref(readers))
{
- List<Bounds<Token>> boundsToInvalidate = new ArrayList<>(readers.size());
- for (SSTableReader sstable : readers)
- boundsToInvalidate.add(new Bounds<Token>(sstable.first.getToken(), sstable.last.getToken()));
- Set<Bounds<Token>> nonOverlappingBounds = Bounds.getNonOverlappingBounds(boundsToInvalidate);
+ // add sstables and build secondary indexes
+ cfs.addSSTables(readers);
+ cfs.indexManager.maybeBuildSecondaryIndexes(readers, cfs.indexManager.allIndexesNames());
- if (cfs.isRowCacheEnabled())
+ //invalidate row and counter cache
+ if (cfs.isRowCacheEnabled() || cfs.metadata.isCounter())
{
- int invalidatedKeys = cfs.invalidateRowCache(nonOverlappingBounds);
- if (invalidatedKeys > 0)
- logger.debug("[Stream #{}] Invalidated {} row cache entries on table {}.{} after stream " +
- "receive task completed.", task.session.planId(), invalidatedKeys,
- cfs.keyspace.getName(), cfs.getColumnFamilyName());
- }
-
- if (cfs.metadata.isCounter())
- {
- int invalidatedKeys = cfs.invalidateCounterCache(nonOverlappingBounds);
- if (invalidatedKeys > 0)
- logger.debug("[Stream #{}] Invalidated {} counter cache entries on table {}.{} after stream " +
- "receive task completed.", task.session.planId(), invalidatedKeys,
- cfs.keyspace.getName(), cfs.getColumnFamilyName());
+ List<Bounds<Token>> boundsToInvalidate = new ArrayList<>(readers.size());
+ for (SSTableReader sstable : readers)
+ boundsToInvalidate.add(new Bounds<Token>(sstable.first.getToken(), sstable.last.getToken()));
+ Set<Bounds<Token>> nonOverlappingBounds = Bounds.getNonOverlappingBounds(boundsToInvalidate);
+
+ if (cfs.isRowCacheEnabled())
+ {
+ int invalidatedKeys = cfs.invalidateRowCache(nonOverlappingBounds);
+ if (invalidatedKeys > 0)
+ logger.debug("[Stream #{}] Invalidated {} row cache entries on table {}.{} after stream " +
+ "receive task completed.", task.session.planId(), invalidatedKeys,
+ cfs.keyspace.getName(), cfs.getColumnFamilyName());
+ }
+
+ if (cfs.metadata.isCounter())
+ {
+ int invalidatedKeys = cfs.invalidateCounterCache(nonOverlappingBounds);
+ if (invalidatedKeys > 0)
+ logger.debug("[Stream #{}] Invalidated {} counter cache entries on table {}.{} after stream " +
+ "receive task completed.", task.session.planId(), invalidatedKeys,
+ cfs.keyspace.getName(), cfs.getColumnFamilyName());
+ }
}
}
- }
- task.session.taskCompleted(task);
+ task.session.taskCompleted(task);
+ }
+ catch (Throwable t)
+ {
+ logger.error("Error applying streamed data: ", t);
+ JVMStabilityInspector.inspectThrowable(t);
+ task.session.onError(t);
+ }
}
}
[05/15] cassandra git commit: Add proper error handling to stream
receiver
Posted by yu...@apache.org.
Add proper error handling to stream receiver
patch by Paulo Motta; reviewed by yukim for CASSANDRA-10774
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5ba69a32
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5ba69a32
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5ba69a32
Branch: refs/heads/cassandra-3.1
Commit: 5ba69a32590074610f5516a20b8198416b79dfcf
Parents: 7650fc1
Author: Paulo Motta <pa...@gmail.com>
Authored: Fri Nov 27 16:37:37 2015 -0800
Committer: Yuki Morishita <yu...@apache.org>
Committed: Tue Dec 1 11:53:35 2015 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/streaming/StreamReceiveTask.java | 105 ++++++++++---------
2 files changed, 59 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5ba69a32/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a2f7b6e..3ce2da6 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.1.12
+ * Add proper error handling to stream receiver (CASSANDRA-10774)
* Warn or fail when changing cluster topology live (CASSANDRA-10243)
* Status command in debian/ubuntu init script doesn't work (CASSANDRA-10213)
* Some DROP ... IF EXISTS incorrectly result in exceptions on non-existing KS (CASSANDRA-10658)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5ba69a32/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index 738c93c..8773cab 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@ -40,6 +40,7 @@ import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.io.sstable.SSTableWriter;
import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.concurrent.Refs;
@@ -116,63 +117,73 @@ public class StreamReceiveTask extends StreamTask
public void run()
{
- Pair<String, String> kscf = Schema.instance.getCF(task.cfId);
- if (kscf == null)
+ try
{
- // schema was dropped during streaming
+ Pair<String, String> kscf = Schema.instance.getCF(task.cfId);
+ if (kscf == null)
+ {
+ // schema was dropped during streaming
+ for (SSTableWriter writer : task.sstables)
+ writer.abort();
+ task.sstables.clear();
+ task.session.taskCompleted(task);
+ return;
+ }
+ ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
+
+ File lockfiledir = cfs.directories.getWriteableLocationAsFile(task.sstables.size() * 256L);
+ if (lockfiledir == null)
+ throw new IOError(new IOException("All disks full"));
+ StreamLockfile lockfile = new StreamLockfile(lockfiledir, UUID.randomUUID());
+ lockfile.create(task.sstables);
+ List<SSTableReader> readers = new ArrayList<>();
for (SSTableWriter writer : task.sstables)
- writer.abort();
+ readers.add(writer.closeAndOpenReader());
+ lockfile.delete();
task.sstables.clear();
- return;
- }
- ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
-
- File lockfiledir = cfs.directories.getWriteableLocationAsFile(task.sstables.size() * 256L);
- if (lockfiledir == null)
- throw new IOError(new IOException("All disks full"));
- StreamLockfile lockfile = new StreamLockfile(lockfiledir, UUID.randomUUID());
- lockfile.create(task.sstables);
- List<SSTableReader> readers = new ArrayList<>();
- for (SSTableWriter writer : task.sstables)
- readers.add(writer.closeAndOpenReader());
- lockfile.delete();
- task.sstables.clear();
-
- try (Refs<SSTableReader> refs = Refs.ref(readers))
- {
- // add sstables and build secondary indexes
- cfs.addSSTables(readers);
- cfs.indexManager.maybeBuildSecondaryIndexes(readers, cfs.indexManager.allIndexesNames());
- //invalidate row and counter cache
- if (cfs.isRowCacheEnabled() || cfs.metadata.isCounter())
+ try (Refs<SSTableReader> refs = Refs.ref(readers))
{
- List<Bounds<Token>> boundsToInvalidate = new ArrayList<>(readers.size());
- for (SSTableReader sstable : readers)
- boundsToInvalidate.add(new Bounds<Token>(sstable.first.getToken(), sstable.last.getToken()));
- Set<Bounds<Token>> nonOverlappingBounds = Bounds.getNonOverlappingBounds(boundsToInvalidate);
+ // add sstables and build secondary indexes
+ cfs.addSSTables(readers);
+ cfs.indexManager.maybeBuildSecondaryIndexes(readers, cfs.indexManager.allIndexesNames());
- if (cfs.isRowCacheEnabled())
+ //invalidate row and counter cache
+ if (cfs.isRowCacheEnabled() || cfs.metadata.isCounter())
{
- int invalidatedKeys = cfs.invalidateRowCache(nonOverlappingBounds);
- if (invalidatedKeys > 0)
- logger.debug("[Stream #{}] Invalidated {} row cache entries on table {}.{} after stream " +
- "receive task completed.", task.session.planId(), invalidatedKeys,
- cfs.keyspace.getName(), cfs.getColumnFamilyName());
- }
-
- if (cfs.metadata.isCounter())
- {
- int invalidatedKeys = cfs.invalidateCounterCache(nonOverlappingBounds);
- if (invalidatedKeys > 0)
- logger.debug("[Stream #{}] Invalidated {} counter cache entries on table {}.{} after stream " +
- "receive task completed.", task.session.planId(), invalidatedKeys,
- cfs.keyspace.getName(), cfs.getColumnFamilyName());
+ List<Bounds<Token>> boundsToInvalidate = new ArrayList<>(readers.size());
+ for (SSTableReader sstable : readers)
+ boundsToInvalidate.add(new Bounds<Token>(sstable.first.getToken(), sstable.last.getToken()));
+ Set<Bounds<Token>> nonOverlappingBounds = Bounds.getNonOverlappingBounds(boundsToInvalidate);
+
+ if (cfs.isRowCacheEnabled())
+ {
+ int invalidatedKeys = cfs.invalidateRowCache(nonOverlappingBounds);
+ if (invalidatedKeys > 0)
+ logger.debug("[Stream #{}] Invalidated {} row cache entries on table {}.{} after stream " +
+ "receive task completed.", task.session.planId(), invalidatedKeys,
+ cfs.keyspace.getName(), cfs.getColumnFamilyName());
+ }
+
+ if (cfs.metadata.isCounter())
+ {
+ int invalidatedKeys = cfs.invalidateCounterCache(nonOverlappingBounds);
+ if (invalidatedKeys > 0)
+ logger.debug("[Stream #{}] Invalidated {} counter cache entries on table {}.{} after stream " +
+ "receive task completed.", task.session.planId(), invalidatedKeys,
+ cfs.keyspace.getName(), cfs.getColumnFamilyName());
+ }
}
}
- }
- task.session.taskCompleted(task);
+ task.session.taskCompleted(task);
+ }
+ catch (Throwable t)
+ {
+ logger.error("Error applying streamed data: ", t);
+ JVMStabilityInspector.inspectThrowable(t);
+ task.session.onError(t);
+ }
}
}
[15/15] cassandra git commit: Merge branch 'cassandra-3.1' into trunk
Posted by yu...@apache.org.
Merge branch 'cassandra-3.1' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/03863ed2
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/03863ed2
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/03863ed2
Branch: refs/heads/trunk
Commit: 03863ed2459c1c98361b500f0f24066b3c3bcc3f
Parents: 5daf8d0 5b6a368
Author: Yuki Morishita <yu...@apache.org>
Authored: Tue Dec 1 13:08:01 2015 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Tue Dec 1 13:08:01 2015 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/streaming/StreamReceiveTask.java | 60 ++++++++++----------
2 files changed, 31 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/03863ed2/CHANGES.txt
----------------------------------------------------------------------
[02/15] cassandra git commit: Add proper error handling to stream
receiver
Posted by yu...@apache.org.
Add proper error handling to stream receiver
patch by Paulo Motta; reviewed by yukim for CASSANDRA-10774
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5ba69a32
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5ba69a32
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5ba69a32
Branch: refs/heads/cassandra-2.2
Commit: 5ba69a32590074610f5516a20b8198416b79dfcf
Parents: 7650fc1
Author: Paulo Motta <pa...@gmail.com>
Authored: Fri Nov 27 16:37:37 2015 -0800
Committer: Yuki Morishita <yu...@apache.org>
Committed: Tue Dec 1 11:53:35 2015 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/streaming/StreamReceiveTask.java | 105 ++++++++++---------
2 files changed, 59 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5ba69a32/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a2f7b6e..3ce2da6 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.1.12
+ * Add proper error handling to stream receiver (CASSANDRA-10774)
* Warn or fail when changing cluster topology live (CASSANDRA-10243)
* Status command in debian/ubuntu init script doesn't work (CASSANDRA-10213)
* Some DROP ... IF EXISTS incorrectly result in exceptions on non-existing KS (CASSANDRA-10658)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5ba69a32/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index 738c93c..8773cab 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@ -40,6 +40,7 @@ import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.io.sstable.SSTableWriter;
import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.concurrent.Refs;
@@ -116,63 +117,73 @@ public class StreamReceiveTask extends StreamTask
public void run()
{
- Pair<String, String> kscf = Schema.instance.getCF(task.cfId);
- if (kscf == null)
+ try
{
- // schema was dropped during streaming
+ Pair<String, String> kscf = Schema.instance.getCF(task.cfId);
+ if (kscf == null)
+ {
+ // schema was dropped during streaming
+ for (SSTableWriter writer : task.sstables)
+ writer.abort();
+ task.sstables.clear();
+ task.session.taskCompleted(task);
+ return;
+ }
+ ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
+
+ File lockfiledir = cfs.directories.getWriteableLocationAsFile(task.sstables.size() * 256L);
+ if (lockfiledir == null)
+ throw new IOError(new IOException("All disks full"));
+ StreamLockfile lockfile = new StreamLockfile(lockfiledir, UUID.randomUUID());
+ lockfile.create(task.sstables);
+ List<SSTableReader> readers = new ArrayList<>();
for (SSTableWriter writer : task.sstables)
- writer.abort();
+ readers.add(writer.closeAndOpenReader());
+ lockfile.delete();
task.sstables.clear();
- return;
- }
- ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
-
- File lockfiledir = cfs.directories.getWriteableLocationAsFile(task.sstables.size() * 256L);
- if (lockfiledir == null)
- throw new IOError(new IOException("All disks full"));
- StreamLockfile lockfile = new StreamLockfile(lockfiledir, UUID.randomUUID());
- lockfile.create(task.sstables);
- List<SSTableReader> readers = new ArrayList<>();
- for (SSTableWriter writer : task.sstables)
- readers.add(writer.closeAndOpenReader());
- lockfile.delete();
- task.sstables.clear();
-
- try (Refs<SSTableReader> refs = Refs.ref(readers))
- {
- // add sstables and build secondary indexes
- cfs.addSSTables(readers);
- cfs.indexManager.maybeBuildSecondaryIndexes(readers, cfs.indexManager.allIndexesNames());
- //invalidate row and counter cache
- if (cfs.isRowCacheEnabled() || cfs.metadata.isCounter())
+ try (Refs<SSTableReader> refs = Refs.ref(readers))
{
- List<Bounds<Token>> boundsToInvalidate = new ArrayList<>(readers.size());
- for (SSTableReader sstable : readers)
- boundsToInvalidate.add(new Bounds<Token>(sstable.first.getToken(), sstable.last.getToken()));
- Set<Bounds<Token>> nonOverlappingBounds = Bounds.getNonOverlappingBounds(boundsToInvalidate);
+ // add sstables and build secondary indexes
+ cfs.addSSTables(readers);
+ cfs.indexManager.maybeBuildSecondaryIndexes(readers, cfs.indexManager.allIndexesNames());
- if (cfs.isRowCacheEnabled())
+ //invalidate row and counter cache
+ if (cfs.isRowCacheEnabled() || cfs.metadata.isCounter())
{
- int invalidatedKeys = cfs.invalidateRowCache(nonOverlappingBounds);
- if (invalidatedKeys > 0)
- logger.debug("[Stream #{}] Invalidated {} row cache entries on table {}.{} after stream " +
- "receive task completed.", task.session.planId(), invalidatedKeys,
- cfs.keyspace.getName(), cfs.getColumnFamilyName());
- }
-
- if (cfs.metadata.isCounter())
- {
- int invalidatedKeys = cfs.invalidateCounterCache(nonOverlappingBounds);
- if (invalidatedKeys > 0)
- logger.debug("[Stream #{}] Invalidated {} counter cache entries on table {}.{} after stream " +
- "receive task completed.", task.session.planId(), invalidatedKeys,
- cfs.keyspace.getName(), cfs.getColumnFamilyName());
+ List<Bounds<Token>> boundsToInvalidate = new ArrayList<>(readers.size());
+ for (SSTableReader sstable : readers)
+ boundsToInvalidate.add(new Bounds<Token>(sstable.first.getToken(), sstable.last.getToken()));
+ Set<Bounds<Token>> nonOverlappingBounds = Bounds.getNonOverlappingBounds(boundsToInvalidate);
+
+ if (cfs.isRowCacheEnabled())
+ {
+ int invalidatedKeys = cfs.invalidateRowCache(nonOverlappingBounds);
+ if (invalidatedKeys > 0)
+ logger.debug("[Stream #{}] Invalidated {} row cache entries on table {}.{} after stream " +
+ "receive task completed.", task.session.planId(), invalidatedKeys,
+ cfs.keyspace.getName(), cfs.getColumnFamilyName());
+ }
+
+ if (cfs.metadata.isCounter())
+ {
+ int invalidatedKeys = cfs.invalidateCounterCache(nonOverlappingBounds);
+ if (invalidatedKeys > 0)
+ logger.debug("[Stream #{}] Invalidated {} counter cache entries on table {}.{} after stream " +
+ "receive task completed.", task.session.planId(), invalidatedKeys,
+ cfs.keyspace.getName(), cfs.getColumnFamilyName());
+ }
}
}
- }
- task.session.taskCompleted(task);
+ task.session.taskCompleted(task);
+ }
+ catch (Throwable t)
+ {
+ logger.error("Error applying streamed data: ", t);
+ JVMStabilityInspector.inspectThrowable(t);
+ task.session.onError(t);
+ }
}
}
[10/15] cassandra git commit: Merge branch 'cassandra-2.2' into
cassandra-3.0
Posted by yu...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ccb20ad4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ccb20ad4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ccb20ad4
Branch: refs/heads/trunk
Commit: ccb20ad46ab38961aac39cc8634f450046bdf16b
Parents: 803a3d9 2491ede
Author: Yuki Morishita <yu...@apache.org>
Authored: Tue Dec 1 13:07:39 2015 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Tue Dec 1 13:07:39 2015 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/streaming/StreamReceiveTask.java | 60 ++++++++++----------
2 files changed, 31 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ccb20ad4/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 7fffbbf,7541212..a01011b
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -23,7 -8,16 +23,8 @@@ Merged from 2.2
* Fix SimpleDateType type compatibility (CASSANDRA-10027)
* (Hadoop) fix splits calculation (CASSANDRA-10640)
* (Hadoop) ensure that Cluster instances are always closed (CASSANDRA-10058)
- * (cqlsh) show partial trace if incomplete after max_trace_wait (CASSANDRA-7645)
- * Use most up-to-date version of schema for system tables (CASSANDRA-10652)
- * Deprecate memory_allocator in cassandra.yaml (CASSANDRA-10581,10628)
- * Expose phi values from failure detector via JMX and tweak debug
- and trace logging (CASSANDRA-9526)
- * Fix RangeNamesQueryPager (CASSANDRA-10509)
- * Deprecate Pig support (CASSANDRA-10542)
- * Reduce contention getting instances of CompositeType (CASSANDRA-10433)
Merged from 2.1:
+ * Add proper error handling to stream receiver (CASSANDRA-10774)
* Warn or fail when changing cluster topology live (CASSANDRA-10243)
* Status command in debian/ubuntu init script doesn't work (CASSANDRA-10213)
* Some DROP ... IF EXISTS incorrectly result in exceptions on non-existing KS (CASSANDRA-10658)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ccb20ad4/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index 54ce711,dd56b8b..dfc91f9
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@@ -126,110 -113,73 +126,110 @@@ public class StreamReceiveTask extends
public void run()
{
- Pair<String, String> kscf = Schema.instance.getCF(task.cfId);
- if (kscf == null)
- {
- // schema was dropped during streaming
- task.sstables.forEach(SSTableMultiWriter::abortOrDie);
-
- task.sstables.clear();
- task.txn.abort();
- return;
- }
- ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
- boolean hasViews = !Iterables.isEmpty(View.findAll(kscf.left, kscf.right));
-
++ boolean hasViews = false;
++ ColumnFamilyStore cfs = null;
try
{
+ Pair<String, String> kscf = Schema.instance.getCF(task.cfId);
+ if (kscf == null)
+ {
+ // schema was dropped during streaming
- for (SSTableWriter writer : task.sstables)
- writer.abort();
++ task.sstables.forEach(SSTableMultiWriter::abortOrDie);
+ task.sstables.clear();
++ task.txn.abort();
+ task.session.taskCompleted(task);
+ return;
+ }
- ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
++ cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
++ hasViews = !Iterables.isEmpty(View.findAll(kscf.left, kscf.right));
+
- File lockfiledir = cfs.directories.getWriteableLocationAsFile(task.sstables.size() * 256L);
- if (lockfiledir == null)
- throw new IOError(new IOException("All disks full"));
- StreamLockfile lockfile = new StreamLockfile(lockfiledir, UUID.randomUUID());
- lockfile.create(task.sstables);
List<SSTableReader> readers = new ArrayList<>();
- for (SSTableWriter writer : task.sstables)
- readers.add(writer.finish(true));
- lockfile.delete();
+ for (SSTableMultiWriter writer : task.sstables)
+ {
+ Collection<SSTableReader> newReaders = writer.finish(true);
+ readers.addAll(newReaders);
+ task.txn.update(newReaders, false);
+ }
+
task.sstables.clear();
try (Refs<SSTableReader> refs = Refs.ref(readers))
{
- // add sstables and build secondary indexes
- cfs.addSSTables(readers);
- cfs.indexManager.maybeBuildSecondaryIndexes(readers, cfs.indexManager.allIndexesNames());
-
- //invalidate row and counter cache
- if (cfs.isRowCacheEnabled() || cfs.metadata.isCounter())
+ //We have a special path for views.
+ //Since the view requires cleaning up any pre-existing state, we must put
+ //all partitions through the same write path as normal mutations.
+ //This also ensures any 2is are also updated
+ if (hasViews)
{
- List<Bounds<Token>> boundsToInvalidate = new ArrayList<>(readers.size());
- for (SSTableReader sstable : readers)
- boundsToInvalidate.add(new Bounds<Token>(sstable.first.getToken(), sstable.last.getToken()));
- Set<Bounds<Token>> nonOverlappingBounds = Bounds.getNonOverlappingBounds(boundsToInvalidate);
-
- if (cfs.isRowCacheEnabled())
+ for (SSTableReader reader : readers)
{
- int invalidatedKeys = cfs.invalidateRowCache(nonOverlappingBounds);
- if (invalidatedKeys > 0)
- logger.debug("[Stream #{}] Invalidated {} row cache entries on table {}.{} after stream " +
- "receive task completed.", task.session.planId(), invalidatedKeys,
- cfs.keyspace.getName(), cfs.getColumnFamilyName());
+ try (ISSTableScanner scanner = reader.getScanner())
+ {
+ while (scanner.hasNext())
+ {
+ try (UnfilteredRowIterator rowIterator = scanner.next())
+ {
+ //Apply unsafe (we will flush below before transaction is done)
+ new Mutation(PartitionUpdate.fromIterator(rowIterator)).applyUnsafe();
+ }
+ }
+ }
}
+ }
+ else
+ {
+ task.txn.finish();
- if (cfs.metadata.isCounter())
+ // add sstables and build secondary indexes
+ cfs.addSSTables(readers);
+ cfs.indexManager.buildAllIndexesBlocking(readers);
+
+ //invalidate row and counter cache
+ if (cfs.isRowCacheEnabled() || cfs.metadata.isCounter())
{
- int invalidatedKeys = cfs.invalidateCounterCache(nonOverlappingBounds);
- if (invalidatedKeys > 0)
- logger.debug("[Stream #{}] Invalidated {} counter cache entries on table {}.{} after stream " +
- "receive task completed.", task.session.planId(), invalidatedKeys,
- cfs.keyspace.getName(), cfs.getColumnFamilyName());
+ List<Bounds<Token>> boundsToInvalidate = new ArrayList<>(readers.size());
+ readers.forEach(sstable -> boundsToInvalidate.add(new Bounds<Token>(sstable.first.getToken(), sstable.last.getToken())));
+ Set<Bounds<Token>> nonOverlappingBounds = Bounds.getNonOverlappingBounds(boundsToInvalidate);
+
+ if (cfs.isRowCacheEnabled())
+ {
+ int invalidatedKeys = cfs.invalidateRowCache(nonOverlappingBounds);
+ if (invalidatedKeys > 0)
+ logger.debug("[Stream #{}] Invalidated {} row cache entries on table {}.{} after stream " +
+ "receive task completed.", task.session.planId(), invalidatedKeys,
+ cfs.keyspace.getName(), cfs.getTableName());
+ }
+
+ if (cfs.metadata.isCounter())
+ {
+ int invalidatedKeys = cfs.invalidateCounterCache(nonOverlappingBounds);
+ if (invalidatedKeys > 0)
+ logger.debug("[Stream #{}] Invalidated {} counter cache entries on table {}.{} after stream " +
+ "receive task completed.", task.session.planId(), invalidatedKeys,
+ cfs.keyspace.getName(), cfs.getTableName());
+ }
}
- }
- }
- catch (Throwable t)
- {
- logger.error("Error applying streamed sstable: ", t);
-
- JVMStabilityInspector.inspectThrowable(t);
- }
- finally
- {
- //We don't keep the streamed sstables since we've applied them manually
- //So we abort the txn and delete the streamed sstables
- if (hasViews)
- {
- cfs.forceBlockingFlush();
- task.txn.abort();
++ task.session.taskCompleted(task);
}
}
-
- task.session.taskCompleted(task);
}
+ catch (Throwable t)
+ {
+ logger.error("Error applying streamed data: ", t);
+ JVMStabilityInspector.inspectThrowable(t);
+ task.session.onError(t);
+ }
+ finally
+ {
- task.session.taskCompleted(task);
++ //We don't keep the streamed sstables since we've applied them manually
++ //So we abort the txn and delete the streamed sstables
++ if (hasViews)
++ {
++ if (cfs != null)
++ cfs.forceBlockingFlush();
++ task.txn.abort();
++ }
+ }
}
}
[13/15] cassandra git commit: Merge branch 'cassandra-3.0' into
cassandra-3.1
Posted by yu...@apache.org.
Merge branch 'cassandra-3.0' into cassandra-3.1
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5b6a368c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5b6a368c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5b6a368c
Branch: refs/heads/cassandra-3.1
Commit: 5b6a368c90233072c419e94bd3c3cb8c1362376e
Parents: 6bda886 ccb20ad
Author: Yuki Morishita <yu...@apache.org>
Authored: Tue Dec 1 13:07:51 2015 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Tue Dec 1 13:07:51 2015 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/streaming/StreamReceiveTask.java | 60 ++++++++++----------
2 files changed, 31 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b6a368c/CHANGES.txt
----------------------------------------------------------------------
[12/15] cassandra git commit: Merge branch 'cassandra-2.2' into
cassandra-3.0
Posted by yu...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ccb20ad4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ccb20ad4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ccb20ad4
Branch: refs/heads/cassandra-3.0
Commit: ccb20ad46ab38961aac39cc8634f450046bdf16b
Parents: 803a3d9 2491ede
Author: Yuki Morishita <yu...@apache.org>
Authored: Tue Dec 1 13:07:39 2015 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Tue Dec 1 13:07:39 2015 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/streaming/StreamReceiveTask.java | 60 ++++++++++----------
2 files changed, 31 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ccb20ad4/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 7fffbbf,7541212..a01011b
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -23,7 -8,16 +23,8 @@@ Merged from 2.2
* Fix SimpleDateType type compatibility (CASSANDRA-10027)
* (Hadoop) fix splits calculation (CASSANDRA-10640)
* (Hadoop) ensure that Cluster instances are always closed (CASSANDRA-10058)
- * (cqlsh) show partial trace if incomplete after max_trace_wait (CASSANDRA-7645)
- * Use most up-to-date version of schema for system tables (CASSANDRA-10652)
- * Deprecate memory_allocator in cassandra.yaml (CASSANDRA-10581,10628)
- * Expose phi values from failure detector via JMX and tweak debug
- and trace logging (CASSANDRA-9526)
- * Fix RangeNamesQueryPager (CASSANDRA-10509)
- * Deprecate Pig support (CASSANDRA-10542)
- * Reduce contention getting instances of CompositeType (CASSANDRA-10433)
Merged from 2.1:
+ * Add proper error handling to stream receiver (CASSANDRA-10774)
* Warn or fail when changing cluster topology live (CASSANDRA-10243)
* Status command in debian/ubuntu init script doesn't work (CASSANDRA-10213)
* Some DROP ... IF EXISTS incorrectly result in exceptions on non-existing KS (CASSANDRA-10658)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ccb20ad4/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index 54ce711,dd56b8b..dfc91f9
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@@ -126,110 -113,73 +126,110 @@@ public class StreamReceiveTask extends
public void run()
{
- Pair<String, String> kscf = Schema.instance.getCF(task.cfId);
- if (kscf == null)
- {
- // schema was dropped during streaming
- task.sstables.forEach(SSTableMultiWriter::abortOrDie);
-
- task.sstables.clear();
- task.txn.abort();
- return;
- }
- ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
- boolean hasViews = !Iterables.isEmpty(View.findAll(kscf.left, kscf.right));
-
++ boolean hasViews = false;
++ ColumnFamilyStore cfs = null;
try
{
+ Pair<String, String> kscf = Schema.instance.getCF(task.cfId);
+ if (kscf == null)
+ {
+ // schema was dropped during streaming
- for (SSTableWriter writer : task.sstables)
- writer.abort();
++ task.sstables.forEach(SSTableMultiWriter::abortOrDie);
+ task.sstables.clear();
++ task.txn.abort();
+ task.session.taskCompleted(task);
+ return;
+ }
- ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
++ cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
++ hasViews = !Iterables.isEmpty(View.findAll(kscf.left, kscf.right));
+
- File lockfiledir = cfs.directories.getWriteableLocationAsFile(task.sstables.size() * 256L);
- if (lockfiledir == null)
- throw new IOError(new IOException("All disks full"));
- StreamLockfile lockfile = new StreamLockfile(lockfiledir, UUID.randomUUID());
- lockfile.create(task.sstables);
List<SSTableReader> readers = new ArrayList<>();
- for (SSTableWriter writer : task.sstables)
- readers.add(writer.finish(true));
- lockfile.delete();
+ for (SSTableMultiWriter writer : task.sstables)
+ {
+ Collection<SSTableReader> newReaders = writer.finish(true);
+ readers.addAll(newReaders);
+ task.txn.update(newReaders, false);
+ }
+
task.sstables.clear();
try (Refs<SSTableReader> refs = Refs.ref(readers))
{
- // add sstables and build secondary indexes
- cfs.addSSTables(readers);
- cfs.indexManager.maybeBuildSecondaryIndexes(readers, cfs.indexManager.allIndexesNames());
-
- //invalidate row and counter cache
- if (cfs.isRowCacheEnabled() || cfs.metadata.isCounter())
+ //We have a special path for views.
+ //Since the view requires cleaning up any pre-existing state, we must put
+ //all partitions through the same write path as normal mutations.
+ //This also ensures any 2is are also updated
+ if (hasViews)
{
- List<Bounds<Token>> boundsToInvalidate = new ArrayList<>(readers.size());
- for (SSTableReader sstable : readers)
- boundsToInvalidate.add(new Bounds<Token>(sstable.first.getToken(), sstable.last.getToken()));
- Set<Bounds<Token>> nonOverlappingBounds = Bounds.getNonOverlappingBounds(boundsToInvalidate);
-
- if (cfs.isRowCacheEnabled())
+ for (SSTableReader reader : readers)
{
- int invalidatedKeys = cfs.invalidateRowCache(nonOverlappingBounds);
- if (invalidatedKeys > 0)
- logger.debug("[Stream #{}] Invalidated {} row cache entries on table {}.{} after stream " +
- "receive task completed.", task.session.planId(), invalidatedKeys,
- cfs.keyspace.getName(), cfs.getColumnFamilyName());
+ try (ISSTableScanner scanner = reader.getScanner())
+ {
+ while (scanner.hasNext())
+ {
+ try (UnfilteredRowIterator rowIterator = scanner.next())
+ {
+ //Apply unsafe (we will flush below before transaction is done)
+ new Mutation(PartitionUpdate.fromIterator(rowIterator)).applyUnsafe();
+ }
+ }
+ }
}
+ }
+ else
+ {
+ task.txn.finish();
- if (cfs.metadata.isCounter())
+ // add sstables and build secondary indexes
+ cfs.addSSTables(readers);
+ cfs.indexManager.buildAllIndexesBlocking(readers);
+
+ //invalidate row and counter cache
+ if (cfs.isRowCacheEnabled() || cfs.metadata.isCounter())
{
- int invalidatedKeys = cfs.invalidateCounterCache(nonOverlappingBounds);
- if (invalidatedKeys > 0)
- logger.debug("[Stream #{}] Invalidated {} counter cache entries on table {}.{} after stream " +
- "receive task completed.", task.session.planId(), invalidatedKeys,
- cfs.keyspace.getName(), cfs.getColumnFamilyName());
+ List<Bounds<Token>> boundsToInvalidate = new ArrayList<>(readers.size());
+ readers.forEach(sstable -> boundsToInvalidate.add(new Bounds<Token>(sstable.first.getToken(), sstable.last.getToken())));
+ Set<Bounds<Token>> nonOverlappingBounds = Bounds.getNonOverlappingBounds(boundsToInvalidate);
+
+ if (cfs.isRowCacheEnabled())
+ {
+ int invalidatedKeys = cfs.invalidateRowCache(nonOverlappingBounds);
+ if (invalidatedKeys > 0)
+ logger.debug("[Stream #{}] Invalidated {} row cache entries on table {}.{} after stream " +
+ "receive task completed.", task.session.planId(), invalidatedKeys,
+ cfs.keyspace.getName(), cfs.getTableName());
+ }
+
+ if (cfs.metadata.isCounter())
+ {
+ int invalidatedKeys = cfs.invalidateCounterCache(nonOverlappingBounds);
+ if (invalidatedKeys > 0)
+ logger.debug("[Stream #{}] Invalidated {} counter cache entries on table {}.{} after stream " +
+ "receive task completed.", task.session.planId(), invalidatedKeys,
+ cfs.keyspace.getName(), cfs.getTableName());
+ }
}
- }
- }
- catch (Throwable t)
- {
- logger.error("Error applying streamed sstable: ", t);
-
- JVMStabilityInspector.inspectThrowable(t);
- }
- finally
- {
- //We don't keep the streamed sstables since we've applied them manually
- //So we abort the txn and delete the streamed sstables
- if (hasViews)
- {
- cfs.forceBlockingFlush();
- task.txn.abort();
++ task.session.taskCompleted(task);
}
}
-
- task.session.taskCompleted(task);
}
+ catch (Throwable t)
+ {
+ logger.error("Error applying streamed data: ", t);
+ JVMStabilityInspector.inspectThrowable(t);
+ task.session.onError(t);
+ }
+ finally
+ {
- task.session.taskCompleted(task);
++ //We don't keep the streamed sstables since we've applied them manually
++ //So we abort the txn and delete the streamed sstables
++ if (hasViews)
++ {
++ if (cfs != null)
++ cfs.forceBlockingFlush();
++ task.txn.abort();
++ }
+ }
}
}
[07/15] cassandra git commit: Merge branch 'cassandra-2.1' into
cassandra-2.2
Posted by yu...@apache.org.
Merge branch 'cassandra-2.1' into cassandra-2.2
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2491ede3
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2491ede3
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2491ede3
Branch: refs/heads/cassandra-2.2
Commit: 2491ede3515f4b774069ffd645b0fb18f9c73630
Parents: 1b81ad1 5ba69a3
Author: Yuki Morishita <yu...@apache.org>
Authored: Tue Dec 1 13:05:36 2015 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Tue Dec 1 13:05:36 2015 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/streaming/StreamReceiveTask.java | 105 ++++++++++---------
2 files changed, 59 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2491ede3/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index af1a186,3ce2da6..7541212
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,22 -1,5 +1,23 @@@
-2.1.12
+2.2.4
+ * Show CQL help in cqlsh in web browser (CASSANDRA-7225)
+ * Serialize on disk the proper SSTable compression ratio (CASSANDRA-10775)
+ * Reject index queries while the index is building (CASSANDRA-8505)
+ * CQL.textile syntax incorrectly includes optional keyspace for aggregate SFUNC and FINALFUNC (CASSANDRA-10747)
+ * Fix JSON update with prepared statements (CASSANDRA-10631)
+ * Don't do anticompaction after subrange repair (CASSANDRA-10422)
+ * Fix SimpleDateType type compatibility (CASSANDRA-10027)
+ * (Hadoop) fix splits calculation (CASSANDRA-10640)
+ * (Hadoop) ensure that Cluster instances are always closed (CASSANDRA-10058)
+ * (cqlsh) show partial trace if incomplete after max_trace_wait (CASSANDRA-7645)
+ * Use most up-to-date version of schema for system tables (CASSANDRA-10652)
+ * Deprecate memory_allocator in cassandra.yaml (CASSANDRA-10581,10628)
+ * Expose phi values from failure detector via JMX and tweak debug
+ and trace logging (CASSANDRA-9526)
+ * Fix RangeNamesQueryPager (CASSANDRA-10509)
+ * Deprecate Pig support (CASSANDRA-10542)
+ * Reduce contention getting instances of CompositeType (CASSANDRA-10433)
+Merged from 2.1:
+ * Add proper error handling to stream receiver (CASSANDRA-10774)
* Warn or fail when changing cluster topology live (CASSANDRA-10243)
* Status command in debian/ubuntu init script doesn't work (CASSANDRA-10213)
* Some DROP ... IF EXISTS incorrectly result in exceptions on non-existing KS (CASSANDRA-10658)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2491ede3/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index 846524b,8773cab..dd56b8b
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@@ -37,8 -37,10 +37,9 @@@ import org.apache.cassandra.db.ColumnFa
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.dht.Bounds;
import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.io.sstable.SSTableReader;
-import org.apache.cassandra.io.sstable.SSTableWriter;
-import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.format.SSTableWriter;
+ import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.concurrent.Refs;
@@@ -112,63 -117,73 +113,73 @@@ public class StreamReceiveTask extends
public void run()
{
- Pair<String, String> kscf = Schema.instance.getCF(task.cfId);
- if (kscf == null)
+ try
{
- // schema was dropped during streaming
+ Pair<String, String> kscf = Schema.instance.getCF(task.cfId);
+ if (kscf == null)
+ {
+ // schema was dropped during streaming
+ for (SSTableWriter writer : task.sstables)
+ writer.abort();
+ task.sstables.clear();
+ task.session.taskCompleted(task);
+ return;
+ }
+ ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
+
+ File lockfiledir = cfs.directories.getWriteableLocationAsFile(task.sstables.size() * 256L);
+ if (lockfiledir == null)
+ throw new IOError(new IOException("All disks full"));
+ StreamLockfile lockfile = new StreamLockfile(lockfiledir, UUID.randomUUID());
+ lockfile.create(task.sstables);
+ List<SSTableReader> readers = new ArrayList<>();
for (SSTableWriter writer : task.sstables)
- writer.abort();
- readers.add(writer.closeAndOpenReader());
++ readers.add(writer.finish(true));
+ lockfile.delete();
task.sstables.clear();
- return;
- }
- ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
-
- File lockfiledir = cfs.directories.getWriteableLocationAsFile(task.sstables.size() * 256L);
- if (lockfiledir == null)
- throw new IOError(new IOException("All disks full"));
- StreamLockfile lockfile = new StreamLockfile(lockfiledir, UUID.randomUUID());
- lockfile.create(task.sstables);
- List<SSTableReader> readers = new ArrayList<>();
- for (SSTableWriter writer : task.sstables)
- readers.add(writer.finish(true));
- lockfile.delete();
- task.sstables.clear();
-
- try (Refs<SSTableReader> refs = Refs.ref(readers))
- {
- // add sstables and build secondary indexes
- cfs.addSSTables(readers);
- cfs.indexManager.maybeBuildSecondaryIndexes(readers, cfs.indexManager.allIndexesNames());
- //invalidate row and counter cache
- if (cfs.isRowCacheEnabled() || cfs.metadata.isCounter())
+ try (Refs<SSTableReader> refs = Refs.ref(readers))
{
- List<Bounds<Token>> boundsToInvalidate = new ArrayList<>(readers.size());
- for (SSTableReader sstable : readers)
- boundsToInvalidate.add(new Bounds<Token>(sstable.first.getToken(), sstable.last.getToken()));
- Set<Bounds<Token>> nonOverlappingBounds = Bounds.getNonOverlappingBounds(boundsToInvalidate);
+ // add sstables and build secondary indexes
+ cfs.addSSTables(readers);
+ cfs.indexManager.maybeBuildSecondaryIndexes(readers, cfs.indexManager.allIndexesNames());
- if (cfs.isRowCacheEnabled())
+ //invalidate row and counter cache
+ if (cfs.isRowCacheEnabled() || cfs.metadata.isCounter())
{
- int invalidatedKeys = cfs.invalidateRowCache(nonOverlappingBounds);
- if (invalidatedKeys > 0)
- logger.debug("[Stream #{}] Invalidated {} row cache entries on table {}.{} after stream " +
- "receive task completed.", task.session.planId(), invalidatedKeys,
- cfs.keyspace.getName(), cfs.getColumnFamilyName());
- }
-
- if (cfs.metadata.isCounter())
- {
- int invalidatedKeys = cfs.invalidateCounterCache(nonOverlappingBounds);
- if (invalidatedKeys > 0)
- logger.debug("[Stream #{}] Invalidated {} counter cache entries on table {}.{} after stream " +
- "receive task completed.", task.session.planId(), invalidatedKeys,
- cfs.keyspace.getName(), cfs.getColumnFamilyName());
+ List<Bounds<Token>> boundsToInvalidate = new ArrayList<>(readers.size());
+ for (SSTableReader sstable : readers)
+ boundsToInvalidate.add(new Bounds<Token>(sstable.first.getToken(), sstable.last.getToken()));
+ Set<Bounds<Token>> nonOverlappingBounds = Bounds.getNonOverlappingBounds(boundsToInvalidate);
+
+ if (cfs.isRowCacheEnabled())
+ {
+ int invalidatedKeys = cfs.invalidateRowCache(nonOverlappingBounds);
+ if (invalidatedKeys > 0)
+ logger.debug("[Stream #{}] Invalidated {} row cache entries on table {}.{} after stream " +
+ "receive task completed.", task.session.planId(), invalidatedKeys,
+ cfs.keyspace.getName(), cfs.getColumnFamilyName());
+ }
+
+ if (cfs.metadata.isCounter())
+ {
+ int invalidatedKeys = cfs.invalidateCounterCache(nonOverlappingBounds);
+ if (invalidatedKeys > 0)
+ logger.debug("[Stream #{}] Invalidated {} counter cache entries on table {}.{} after stream " +
+ "receive task completed.", task.session.planId(), invalidatedKeys,
+ cfs.keyspace.getName(), cfs.getColumnFamilyName());
+ }
}
}
- }
- task.session.taskCompleted(task);
+ task.session.taskCompleted(task);
+ }
+ catch (Throwable t)
+ {
+ logger.error("Error applying streamed data: ", t);
+ JVMStabilityInspector.inspectThrowable(t);
+ task.session.onError(t);
+ }
}
}
[03/15] cassandra git commit: Add proper error handling to stream
receiver
Posted by yu...@apache.org.
Add proper error handling to stream receiver
patch by Paulo Motta; reviewed by yukim for CASSANDRA-10774
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5ba69a32
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5ba69a32
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5ba69a32
Branch: refs/heads/trunk
Commit: 5ba69a32590074610f5516a20b8198416b79dfcf
Parents: 7650fc1
Author: Paulo Motta <pa...@gmail.com>
Authored: Fri Nov 27 16:37:37 2015 -0800
Committer: Yuki Morishita <yu...@apache.org>
Committed: Tue Dec 1 11:53:35 2015 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/streaming/StreamReceiveTask.java | 105 ++++++++++---------
2 files changed, 59 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5ba69a32/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a2f7b6e..3ce2da6 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.1.12
+ * Add proper error handling to stream receiver (CASSANDRA-10774)
* Warn or fail when changing cluster topology live (CASSANDRA-10243)
* Status command in debian/ubuntu init script doesn't work (CASSANDRA-10213)
* Some DROP ... IF EXISTS incorrectly result in exceptions on non-existing KS (CASSANDRA-10658)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5ba69a32/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index 738c93c..8773cab 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@ -40,6 +40,7 @@ import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.io.sstable.SSTableWriter;
import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.concurrent.Refs;
@@ -116,63 +117,73 @@ public class StreamReceiveTask extends StreamTask
public void run()
{
- Pair<String, String> kscf = Schema.instance.getCF(task.cfId);
- if (kscf == null)
+ try
{
- // schema was dropped during streaming
+ Pair<String, String> kscf = Schema.instance.getCF(task.cfId);
+ if (kscf == null)
+ {
+ // schema was dropped during streaming
+ for (SSTableWriter writer : task.sstables)
+ writer.abort();
+ task.sstables.clear();
+ task.session.taskCompleted(task);
+ return;
+ }
+ ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
+
+ File lockfiledir = cfs.directories.getWriteableLocationAsFile(task.sstables.size() * 256L);
+ if (lockfiledir == null)
+ throw new IOError(new IOException("All disks full"));
+ StreamLockfile lockfile = new StreamLockfile(lockfiledir, UUID.randomUUID());
+ lockfile.create(task.sstables);
+ List<SSTableReader> readers = new ArrayList<>();
for (SSTableWriter writer : task.sstables)
- writer.abort();
+ readers.add(writer.closeAndOpenReader());
+ lockfile.delete();
task.sstables.clear();
- return;
- }
- ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
-
- File lockfiledir = cfs.directories.getWriteableLocationAsFile(task.sstables.size() * 256L);
- if (lockfiledir == null)
- throw new IOError(new IOException("All disks full"));
- StreamLockfile lockfile = new StreamLockfile(lockfiledir, UUID.randomUUID());
- lockfile.create(task.sstables);
- List<SSTableReader> readers = new ArrayList<>();
- for (SSTableWriter writer : task.sstables)
- readers.add(writer.closeAndOpenReader());
- lockfile.delete();
- task.sstables.clear();
-
- try (Refs<SSTableReader> refs = Refs.ref(readers))
- {
- // add sstables and build secondary indexes
- cfs.addSSTables(readers);
- cfs.indexManager.maybeBuildSecondaryIndexes(readers, cfs.indexManager.allIndexesNames());
- //invalidate row and counter cache
- if (cfs.isRowCacheEnabled() || cfs.metadata.isCounter())
+ try (Refs<SSTableReader> refs = Refs.ref(readers))
{
- List<Bounds<Token>> boundsToInvalidate = new ArrayList<>(readers.size());
- for (SSTableReader sstable : readers)
- boundsToInvalidate.add(new Bounds<Token>(sstable.first.getToken(), sstable.last.getToken()));
- Set<Bounds<Token>> nonOverlappingBounds = Bounds.getNonOverlappingBounds(boundsToInvalidate);
+ // add sstables and build secondary indexes
+ cfs.addSSTables(readers);
+ cfs.indexManager.maybeBuildSecondaryIndexes(readers, cfs.indexManager.allIndexesNames());
- if (cfs.isRowCacheEnabled())
+ //invalidate row and counter cache
+ if (cfs.isRowCacheEnabled() || cfs.metadata.isCounter())
{
- int invalidatedKeys = cfs.invalidateRowCache(nonOverlappingBounds);
- if (invalidatedKeys > 0)
- logger.debug("[Stream #{}] Invalidated {} row cache entries on table {}.{} after stream " +
- "receive task completed.", task.session.planId(), invalidatedKeys,
- cfs.keyspace.getName(), cfs.getColumnFamilyName());
- }
-
- if (cfs.metadata.isCounter())
- {
- int invalidatedKeys = cfs.invalidateCounterCache(nonOverlappingBounds);
- if (invalidatedKeys > 0)
- logger.debug("[Stream #{}] Invalidated {} counter cache entries on table {}.{} after stream " +
- "receive task completed.", task.session.planId(), invalidatedKeys,
- cfs.keyspace.getName(), cfs.getColumnFamilyName());
+ List<Bounds<Token>> boundsToInvalidate = new ArrayList<>(readers.size());
+ for (SSTableReader sstable : readers)
+ boundsToInvalidate.add(new Bounds<Token>(sstable.first.getToken(), sstable.last.getToken()));
+ Set<Bounds<Token>> nonOverlappingBounds = Bounds.getNonOverlappingBounds(boundsToInvalidate);
+
+ if (cfs.isRowCacheEnabled())
+ {
+ int invalidatedKeys = cfs.invalidateRowCache(nonOverlappingBounds);
+ if (invalidatedKeys > 0)
+ logger.debug("[Stream #{}] Invalidated {} row cache entries on table {}.{} after stream " +
+ "receive task completed.", task.session.planId(), invalidatedKeys,
+ cfs.keyspace.getName(), cfs.getColumnFamilyName());
+ }
+
+ if (cfs.metadata.isCounter())
+ {
+ int invalidatedKeys = cfs.invalidateCounterCache(nonOverlappingBounds);
+ if (invalidatedKeys > 0)
+ logger.debug("[Stream #{}] Invalidated {} counter cache entries on table {}.{} after stream " +
+ "receive task completed.", task.session.planId(), invalidatedKeys,
+ cfs.keyspace.getName(), cfs.getColumnFamilyName());
+ }
}
}
- }
- task.session.taskCompleted(task);
+ task.session.taskCompleted(task);
+ }
+ catch (Throwable t)
+ {
+ logger.error("Error applying streamed data: ", t);
+ JVMStabilityInspector.inspectThrowable(t);
+ task.session.onError(t);
+ }
}
}
[04/15] cassandra git commit: Add proper error handling to stream
receiver
Posted by yu...@apache.org.
Add proper error handling to stream receiver
patch by Paulo Motta; reviewed by yukim for CASSANDRA-10774
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5ba69a32
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5ba69a32
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5ba69a32
Branch: refs/heads/cassandra-3.0
Commit: 5ba69a32590074610f5516a20b8198416b79dfcf
Parents: 7650fc1
Author: Paulo Motta <pa...@gmail.com>
Authored: Fri Nov 27 16:37:37 2015 -0800
Committer: Yuki Morishita <yu...@apache.org>
Committed: Tue Dec 1 11:53:35 2015 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/streaming/StreamReceiveTask.java | 105 ++++++++++---------
2 files changed, 59 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5ba69a32/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a2f7b6e..3ce2da6 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.1.12
+ * Add proper error handling to stream receiver (CASSANDRA-10774)
* Warn or fail when changing cluster topology live (CASSANDRA-10243)
* Status command in debian/ubuntu init script doesn't work (CASSANDRA-10213)
* Some DROP ... IF EXISTS incorrectly result in exceptions on non-existing KS (CASSANDRA-10658)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5ba69a32/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index 738c93c..8773cab 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@ -40,6 +40,7 @@ import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.io.sstable.SSTableWriter;
import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.concurrent.Refs;
@@ -116,63 +117,73 @@ public class StreamReceiveTask extends StreamTask
public void run()
{
- Pair<String, String> kscf = Schema.instance.getCF(task.cfId);
- if (kscf == null)
+ try
{
- // schema was dropped during streaming
+ Pair<String, String> kscf = Schema.instance.getCF(task.cfId);
+ if (kscf == null)
+ {
+ // schema was dropped during streaming
+ for (SSTableWriter writer : task.sstables)
+ writer.abort();
+ task.sstables.clear();
+ task.session.taskCompleted(task);
+ return;
+ }
+ ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
+
+ File lockfiledir = cfs.directories.getWriteableLocationAsFile(task.sstables.size() * 256L);
+ if (lockfiledir == null)
+ throw new IOError(new IOException("All disks full"));
+ StreamLockfile lockfile = new StreamLockfile(lockfiledir, UUID.randomUUID());
+ lockfile.create(task.sstables);
+ List<SSTableReader> readers = new ArrayList<>();
for (SSTableWriter writer : task.sstables)
- writer.abort();
+ readers.add(writer.closeAndOpenReader());
+ lockfile.delete();
task.sstables.clear();
- return;
- }
- ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
-
- File lockfiledir = cfs.directories.getWriteableLocationAsFile(task.sstables.size() * 256L);
- if (lockfiledir == null)
- throw new IOError(new IOException("All disks full"));
- StreamLockfile lockfile = new StreamLockfile(lockfiledir, UUID.randomUUID());
- lockfile.create(task.sstables);
- List<SSTableReader> readers = new ArrayList<>();
- for (SSTableWriter writer : task.sstables)
- readers.add(writer.closeAndOpenReader());
- lockfile.delete();
- task.sstables.clear();
-
- try (Refs<SSTableReader> refs = Refs.ref(readers))
- {
- // add sstables and build secondary indexes
- cfs.addSSTables(readers);
- cfs.indexManager.maybeBuildSecondaryIndexes(readers, cfs.indexManager.allIndexesNames());
- //invalidate row and counter cache
- if (cfs.isRowCacheEnabled() || cfs.metadata.isCounter())
+ try (Refs<SSTableReader> refs = Refs.ref(readers))
{
- List<Bounds<Token>> boundsToInvalidate = new ArrayList<>(readers.size());
- for (SSTableReader sstable : readers)
- boundsToInvalidate.add(new Bounds<Token>(sstable.first.getToken(), sstable.last.getToken()));
- Set<Bounds<Token>> nonOverlappingBounds = Bounds.getNonOverlappingBounds(boundsToInvalidate);
+ // add sstables and build secondary indexes
+ cfs.addSSTables(readers);
+ cfs.indexManager.maybeBuildSecondaryIndexes(readers, cfs.indexManager.allIndexesNames());
- if (cfs.isRowCacheEnabled())
+ //invalidate row and counter cache
+ if (cfs.isRowCacheEnabled() || cfs.metadata.isCounter())
{
- int invalidatedKeys = cfs.invalidateRowCache(nonOverlappingBounds);
- if (invalidatedKeys > 0)
- logger.debug("[Stream #{}] Invalidated {} row cache entries on table {}.{} after stream " +
- "receive task completed.", task.session.planId(), invalidatedKeys,
- cfs.keyspace.getName(), cfs.getColumnFamilyName());
- }
-
- if (cfs.metadata.isCounter())
- {
- int invalidatedKeys = cfs.invalidateCounterCache(nonOverlappingBounds);
- if (invalidatedKeys > 0)
- logger.debug("[Stream #{}] Invalidated {} counter cache entries on table {}.{} after stream " +
- "receive task completed.", task.session.planId(), invalidatedKeys,
- cfs.keyspace.getName(), cfs.getColumnFamilyName());
+ List<Bounds<Token>> boundsToInvalidate = new ArrayList<>(readers.size());
+ for (SSTableReader sstable : readers)
+ boundsToInvalidate.add(new Bounds<Token>(sstable.first.getToken(), sstable.last.getToken()));
+ Set<Bounds<Token>> nonOverlappingBounds = Bounds.getNonOverlappingBounds(boundsToInvalidate);
+
+ if (cfs.isRowCacheEnabled())
+ {
+ int invalidatedKeys = cfs.invalidateRowCache(nonOverlappingBounds);
+ if (invalidatedKeys > 0)
+ logger.debug("[Stream #{}] Invalidated {} row cache entries on table {}.{} after stream " +
+ "receive task completed.", task.session.planId(), invalidatedKeys,
+ cfs.keyspace.getName(), cfs.getColumnFamilyName());
+ }
+
+ if (cfs.metadata.isCounter())
+ {
+ int invalidatedKeys = cfs.invalidateCounterCache(nonOverlappingBounds);
+ if (invalidatedKeys > 0)
+ logger.debug("[Stream #{}] Invalidated {} counter cache entries on table {}.{} after stream " +
+ "receive task completed.", task.session.planId(), invalidatedKeys,
+ cfs.keyspace.getName(), cfs.getColumnFamilyName());
+ }
}
}
- }
- task.session.taskCompleted(task);
+ task.session.taskCompleted(task);
+ }
+ catch (Throwable t)
+ {
+ logger.error("Error applying streamed data: ", t);
+ JVMStabilityInspector.inspectThrowable(t);
+ task.session.onError(t);
+ }
}
}
[14/15] cassandra git commit: Merge branch 'cassandra-3.0' into
cassandra-3.1
Posted by yu...@apache.org.
Merge branch 'cassandra-3.0' into cassandra-3.1
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5b6a368c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5b6a368c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5b6a368c
Branch: refs/heads/trunk
Commit: 5b6a368c90233072c419e94bd3c3cb8c1362376e
Parents: 6bda886 ccb20ad
Author: Yuki Morishita <yu...@apache.org>
Authored: Tue Dec 1 13:07:51 2015 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Tue Dec 1 13:07:51 2015 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/streaming/StreamReceiveTask.java | 60 ++++++++++----------
2 files changed, 31 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b6a368c/CHANGES.txt
----------------------------------------------------------------------
[09/15] cassandra git commit: Merge branch 'cassandra-2.1' into
cassandra-2.2
Posted by yu...@apache.org.
Merge branch 'cassandra-2.1' into cassandra-2.2
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2491ede3
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2491ede3
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2491ede3
Branch: refs/heads/cassandra-3.1
Commit: 2491ede3515f4b774069ffd645b0fb18f9c73630
Parents: 1b81ad1 5ba69a3
Author: Yuki Morishita <yu...@apache.org>
Authored: Tue Dec 1 13:05:36 2015 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Tue Dec 1 13:05:36 2015 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/streaming/StreamReceiveTask.java | 105 ++++++++++---------
2 files changed, 59 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2491ede3/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index af1a186,3ce2da6..7541212
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,22 -1,5 +1,23 @@@
-2.1.12
+2.2.4
+ * Show CQL help in cqlsh in web browser (CASSANDRA-7225)
+ * Serialize on disk the proper SSTable compression ratio (CASSANDRA-10775)
+ * Reject index queries while the index is building (CASSANDRA-8505)
+ * CQL.textile syntax incorrectly includes optional keyspace for aggregate SFUNC and FINALFUNC (CASSANDRA-10747)
+ * Fix JSON update with prepared statements (CASSANDRA-10631)
+ * Don't do anticompaction after subrange repair (CASSANDRA-10422)
+ * Fix SimpleDateType type compatibility (CASSANDRA-10027)
+ * (Hadoop) fix splits calculation (CASSANDRA-10640)
+ * (Hadoop) ensure that Cluster instances are always closed (CASSANDRA-10058)
+ * (cqlsh) show partial trace if incomplete after max_trace_wait (CASSANDRA-7645)
+ * Use most up-to-date version of schema for system tables (CASSANDRA-10652)
+ * Deprecate memory_allocator in cassandra.yaml (CASSANDRA-10581,10628)
+ * Expose phi values from failure detector via JMX and tweak debug
+ and trace logging (CASSANDRA-9526)
+ * Fix RangeNamesQueryPager (CASSANDRA-10509)
+ * Deprecate Pig support (CASSANDRA-10542)
+ * Reduce contention getting instances of CompositeType (CASSANDRA-10433)
+Merged from 2.1:
+ * Add proper error handling to stream receiver (CASSANDRA-10774)
* Warn or fail when changing cluster topology live (CASSANDRA-10243)
* Status command in debian/ubuntu init script doesn't work (CASSANDRA-10213)
* Some DROP ... IF EXISTS incorrectly result in exceptions on non-existing KS (CASSANDRA-10658)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2491ede3/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index 846524b,8773cab..dd56b8b
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@@ -37,8 -37,10 +37,9 @@@ import org.apache.cassandra.db.ColumnFa
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.dht.Bounds;
import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.io.sstable.SSTableReader;
-import org.apache.cassandra.io.sstable.SSTableWriter;
-import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.format.SSTableWriter;
+ import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.concurrent.Refs;
@@@ -112,63 -117,73 +113,73 @@@ public class StreamReceiveTask extends
public void run()
{
- Pair<String, String> kscf = Schema.instance.getCF(task.cfId);
- if (kscf == null)
+ try
{
- // schema was dropped during streaming
+ Pair<String, String> kscf = Schema.instance.getCF(task.cfId);
+ if (kscf == null)
+ {
+ // schema was dropped during streaming
+ for (SSTableWriter writer : task.sstables)
+ writer.abort();
+ task.sstables.clear();
+ task.session.taskCompleted(task);
+ return;
+ }
+ ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
+
+ File lockfiledir = cfs.directories.getWriteableLocationAsFile(task.sstables.size() * 256L);
+ if (lockfiledir == null)
+ throw new IOError(new IOException("All disks full"));
+ StreamLockfile lockfile = new StreamLockfile(lockfiledir, UUID.randomUUID());
+ lockfile.create(task.sstables);
+ List<SSTableReader> readers = new ArrayList<>();
for (SSTableWriter writer : task.sstables)
- writer.abort();
- readers.add(writer.closeAndOpenReader());
++ readers.add(writer.finish(true));
+ lockfile.delete();
task.sstables.clear();
- return;
- }
- ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
-
- File lockfiledir = cfs.directories.getWriteableLocationAsFile(task.sstables.size() * 256L);
- if (lockfiledir == null)
- throw new IOError(new IOException("All disks full"));
- StreamLockfile lockfile = new StreamLockfile(lockfiledir, UUID.randomUUID());
- lockfile.create(task.sstables);
- List<SSTableReader> readers = new ArrayList<>();
- for (SSTableWriter writer : task.sstables)
- readers.add(writer.finish(true));
- lockfile.delete();
- task.sstables.clear();
-
- try (Refs<SSTableReader> refs = Refs.ref(readers))
- {
- // add sstables and build secondary indexes
- cfs.addSSTables(readers);
- cfs.indexManager.maybeBuildSecondaryIndexes(readers, cfs.indexManager.allIndexesNames());
- //invalidate row and counter cache
- if (cfs.isRowCacheEnabled() || cfs.metadata.isCounter())
+ try (Refs<SSTableReader> refs = Refs.ref(readers))
{
- List<Bounds<Token>> boundsToInvalidate = new ArrayList<>(readers.size());
- for (SSTableReader sstable : readers)
- boundsToInvalidate.add(new Bounds<Token>(sstable.first.getToken(), sstable.last.getToken()));
- Set<Bounds<Token>> nonOverlappingBounds = Bounds.getNonOverlappingBounds(boundsToInvalidate);
+ // add sstables and build secondary indexes
+ cfs.addSSTables(readers);
+ cfs.indexManager.maybeBuildSecondaryIndexes(readers, cfs.indexManager.allIndexesNames());
- if (cfs.isRowCacheEnabled())
+ //invalidate row and counter cache
+ if (cfs.isRowCacheEnabled() || cfs.metadata.isCounter())
{
- int invalidatedKeys = cfs.invalidateRowCache(nonOverlappingBounds);
- if (invalidatedKeys > 0)
- logger.debug("[Stream #{}] Invalidated {} row cache entries on table {}.{} after stream " +
- "receive task completed.", task.session.planId(), invalidatedKeys,
- cfs.keyspace.getName(), cfs.getColumnFamilyName());
- }
-
- if (cfs.metadata.isCounter())
- {
- int invalidatedKeys = cfs.invalidateCounterCache(nonOverlappingBounds);
- if (invalidatedKeys > 0)
- logger.debug("[Stream #{}] Invalidated {} counter cache entries on table {}.{} after stream " +
- "receive task completed.", task.session.planId(), invalidatedKeys,
- cfs.keyspace.getName(), cfs.getColumnFamilyName());
+ List<Bounds<Token>> boundsToInvalidate = new ArrayList<>(readers.size());
+ for (SSTableReader sstable : readers)
+ boundsToInvalidate.add(new Bounds<Token>(sstable.first.getToken(), sstable.last.getToken()));
+ Set<Bounds<Token>> nonOverlappingBounds = Bounds.getNonOverlappingBounds(boundsToInvalidate);
+
+ if (cfs.isRowCacheEnabled())
+ {
+ int invalidatedKeys = cfs.invalidateRowCache(nonOverlappingBounds);
+ if (invalidatedKeys > 0)
+ logger.debug("[Stream #{}] Invalidated {} row cache entries on table {}.{} after stream " +
+ "receive task completed.", task.session.planId(), invalidatedKeys,
+ cfs.keyspace.getName(), cfs.getColumnFamilyName());
+ }
+
+ if (cfs.metadata.isCounter())
+ {
+ int invalidatedKeys = cfs.invalidateCounterCache(nonOverlappingBounds);
+ if (invalidatedKeys > 0)
+ logger.debug("[Stream #{}] Invalidated {} counter cache entries on table {}.{} after stream " +
+ "receive task completed.", task.session.planId(), invalidatedKeys,
+ cfs.keyspace.getName(), cfs.getColumnFamilyName());
+ }
}
}
- }
- task.session.taskCompleted(task);
+ task.session.taskCompleted(task);
+ }
+ catch (Throwable t)
+ {
+ logger.error("Error applying streamed data: ", t);
+ JVMStabilityInspector.inspectThrowable(t);
+ task.session.onError(t);
+ }
}
}