You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2012/02/22 09:42:17 UTC
git commit: Fix bug with counter in super columns
Updated Branches:
refs/heads/cassandra-1.1 64d279289 -> 9f90a7997
Fix bug with counter in super columns
patch by slebresne; reviewed by yukim for CASSANDRA-3821
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9f90a799
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9f90a799
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9f90a799
Branch: refs/heads/cassandra-1.1
Commit: 9f90a7997a08848e48f26526f9ffe7ae859d046c
Parents: 64d2792
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Wed Feb 22 09:41:31 2012 +0100
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Wed Feb 22 09:41:31 2012 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/db/CollationController.java | 6 +-
src/java/org/apache/cassandra/db/Memtable.java | 3 +-
.../cassandra/db/ThreadSafeSortedColumns.java | 184 +++++++++++++++
4 files changed, 191 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9f90a799/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e93778b..ade1ba8 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -5,6 +5,7 @@
* Show Effective Owership via Nodetool ring <keyspace> (CASSANDRA-3412)
* Update ORDER BY syntax for CQL3 (CASSANDRA-3925)
* Fix BulkRecordWriter to not throw NPE if reducer gets no map data from Hadoop (CASSANDRA-3944)
+ * Fix bug with counters in super columns (CASSANDRA-3821)
Merged from 1.0:
* remove the wait on hint future during write (CASSANDRA-3870)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9f90a799/src/java/org/apache/cassandra/db/CollationController.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CollationController.java b/src/java/org/apache/cassandra/db/CollationController.java
index dd7464d..c9589bc 100644
--- a/src/java/org/apache/cassandra/db/CollationController.java
+++ b/src/java/org/apache/cassandra/db/CollationController.java
@@ -75,8 +75,9 @@ public class CollationController
{
logger.debug("collectTimeOrderedData");
+ // AtomicSortedColumns doesn't work for super columns (see #3821)
ISortedColumns.Factory factory = mutableColumns
- ? AtomicSortedColumns.factory()
+ ? cfs.metadata.cfType == ColumnFamilyType.Super ? ThreadSafeSortedColumns.factory() : AtomicSortedColumns.factory()
: TreeMapBackedSortedColumns.factory();
ColumnFamily container = ColumnFamily.create(cfs.metadata, factory, filter.filter.isReversed());
List<IColumnIterator> iterators = new ArrayList<IColumnIterator>();
@@ -208,8 +209,9 @@ public class CollationController
private ColumnFamily collectAllData()
{
logger.debug("collectAllData");
+ // AtomicSortedColumns doesn't work for super columns (see #3821)
ISortedColumns.Factory factory = mutableColumns
- ? AtomicSortedColumns.factory()
+ ? cfs.metadata.cfType == ColumnFamilyType.Super ? ThreadSafeSortedColumns.factory() : AtomicSortedColumns.factory()
: ArrayBackedSortedColumns.factory();
List<IColumnIterator> iterators = new ArrayList<IColumnIterator>();
ColumnFamily returnCF = ColumnFamily.create(cfs.metadata, factory, filter.filter.isReversed());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9f90a799/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index c0c6289..ddf2179 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -223,7 +223,8 @@ public class Memtable
if (previous == null)
{
- ColumnFamily empty = cf.cloneMeShallow(AtomicSortedColumns.factory(), false);
+ // AtomicSortedColumns doesn't work for super columns (see #3821)
+ ColumnFamily empty = cf.cloneMeShallow(cf.isSuper() ? ThreadSafeSortedColumns.factory() : AtomicSortedColumns.factory(), false);
// We'll add the columns later. This avoids wasting works if we get beaten in the putIfAbsent
previous = columnFamilies.putIfAbsent(new DecoratedKey(key.token, allocator.clone(key.key)), empty);
if (previous == null)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9f90a799/src/java/org/apache/cassandra/db/ThreadSafeSortedColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ThreadSafeSortedColumns.java b/src/java/org/apache/cassandra/db/ThreadSafeSortedColumns.java
new file mode 100644
index 0000000..c82c658
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/ThreadSafeSortedColumns.java
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.SortedMap;
+import java.util.SortedSet;
+import java.util.concurrent.ConcurrentSkipListMap;
+
+import com.google.common.base.Function;
+
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.utils.Allocator;
+
+public class ThreadSafeSortedColumns extends AbstractThreadUnsafeSortedColumns implements ISortedColumns
+{
+ private final ConcurrentSkipListMap<ByteBuffer, IColumn> map;
+
+ public static final ISortedColumns.Factory factory = new Factory()
+ {
+ public ISortedColumns create(AbstractType<?> comparator, boolean insertReversed)
+ {
+ return new ThreadSafeSortedColumns(comparator);
+ }
+
+ public ISortedColumns fromSorted(SortedMap<ByteBuffer, IColumn> sortedMap, boolean insertReversed)
+ {
+ return new ThreadSafeSortedColumns(sortedMap);
+ }
+ };
+
+ public static ISortedColumns.Factory factory()
+ {
+ return factory;
+ }
+
+ public AbstractType<?> getComparator()
+ {
+ return (AbstractType<?>)map.comparator();
+ }
+
+ private ThreadSafeSortedColumns(AbstractType<?> comparator)
+ {
+ this.map = new ConcurrentSkipListMap<ByteBuffer, IColumn>(comparator);
+ }
+
+ private ThreadSafeSortedColumns(SortedMap<ByteBuffer, IColumn> columns)
+ {
+ this.map = new ConcurrentSkipListMap<ByteBuffer, IColumn>(columns);
+ }
+
+ public ISortedColumns.Factory getFactory()
+ {
+ return factory();
+ }
+
+ public ISortedColumns cloneMe()
+ {
+ return new ThreadSafeSortedColumns(map);
+ }
+
+ public boolean isInsertReversed()
+ {
+ return false;
+ }
+
+ /*
+ * If we find an old column that has the same name
+ * the ask it to resolve itself else add the new column
+ */
+ public void addColumn(IColumn column, Allocator allocator)
+ {
+ ByteBuffer name = column.name();
+ IColumn oldColumn;
+ while ((oldColumn = map.putIfAbsent(name, column)) != null)
+ {
+ if (oldColumn instanceof SuperColumn)
+ {
+ assert column instanceof SuperColumn;
+ ((SuperColumn) oldColumn).putColumn((SuperColumn)column, allocator);
+ break; // Delegated to SuperColumn
+ }
+ else
+ {
+ // calculate reconciled col from old (existing) col and new col
+ IColumn reconciledColumn = column.reconcile(oldColumn, allocator);
+ if (map.replace(name, oldColumn, reconciledColumn))
+ break;
+
+ // We failed to replace column due to a concurrent update or a concurrent removal. Keep trying.
+ // (Currently, concurrent removal should not happen (only updates), but let us support that anyway.)
+ }
+ }
+ }
+
+ /**
+ * We need to go through each column in the column container and resolve it before adding
+ */
+ protected void addAllColumns(ISortedColumns cm, Allocator allocator, Function<IColumn, IColumn> transformation)
+ {
+ for (IColumn column : cm.getSortedColumns())
+ addColumn(transformation.apply(column), allocator);
+ }
+
+ public boolean replace(IColumn oldColumn, IColumn newColumn)
+ {
+ if (!oldColumn.name().equals(newColumn.name()))
+ throw new IllegalArgumentException();
+
+ return map.replace(oldColumn.name(), oldColumn, newColumn);
+ }
+
+ public IColumn getColumn(ByteBuffer name)
+ {
+ return map.get(name);
+ }
+
+ public void removeColumn(ByteBuffer name)
+ {
+ map.remove(name);
+ }
+
+ public void clear()
+ {
+ map.clear();
+ }
+
+ public int size()
+ {
+ return map.size();
+ }
+
+ public Collection<IColumn> getSortedColumns()
+ {
+ return map.values();
+ }
+
+ public Collection<IColumn> getReverseSortedColumns()
+ {
+ return map.descendingMap().values();
+ }
+
+ public SortedSet<ByteBuffer> getColumnNames()
+ {
+ return map.navigableKeySet();
+ }
+
+ public Iterator<IColumn> iterator()
+ {
+ return map.values().iterator();
+ }
+
+ public Iterator<IColumn> reverseIterator()
+ {
+ return getReverseSortedColumns().iterator();
+ }
+
+ public Iterator<IColumn> iterator(ByteBuffer start)
+ {
+ return map.tailMap(start).values().iterator();
+ }
+
+ public Iterator<IColumn> reverseIterator(ByteBuffer start)
+ {
+ return map.descendingMap().tailMap(start).values().iterator();
+ }
+}