You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2012/02/22 09:43:32 UTC

[2/6] git commit: Fix bug with counter in super columns

Fix bug with counter in super columns

patch by slebresne; reviewed by yukim for CASSANDRA-3821


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9f90a799
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9f90a799
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9f90a799

Branch: refs/heads/trunk
Commit: 9f90a7997a08848e48f26526f9ffe7ae859d046c
Parents: 64d2792
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Wed Feb 22 09:41:31 2012 +0100
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Wed Feb 22 09:41:31 2012 +0100

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 .../apache/cassandra/db/CollationController.java   |    6 +-
 src/java/org/apache/cassandra/db/Memtable.java     |    3 +-
 .../cassandra/db/ThreadSafeSortedColumns.java      |  184 +++++++++++++++
 4 files changed, 191 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/9f90a799/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e93778b..ade1ba8 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -5,6 +5,7 @@
  * Show Effective Owership via Nodetool ring <keyspace> (CASSANDRA-3412)
  * Update ORDER BY syntax for CQL3 (CASSANDRA-3925)
  * Fix BulkRecordWriter to not throw NPE if reducer gets no map data from Hadoop (CASSANDRA-3944)
+ * Fix bug with counters in super columns (CASSANDRA-3821)
 Merged from 1.0:
  * remove the wait on hint future during write (CASSANDRA-3870)
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9f90a799/src/java/org/apache/cassandra/db/CollationController.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CollationController.java b/src/java/org/apache/cassandra/db/CollationController.java
index dd7464d..c9589bc 100644
--- a/src/java/org/apache/cassandra/db/CollationController.java
+++ b/src/java/org/apache/cassandra/db/CollationController.java
@@ -75,8 +75,9 @@ public class CollationController
     {
         logger.debug("collectTimeOrderedData");
 
+        // AtomicSortedColumns doesn't work for super columns (see #3821)
         ISortedColumns.Factory factory = mutableColumns
-                                       ? AtomicSortedColumns.factory()
+                                       ? cfs.metadata.cfType == ColumnFamilyType.Super ? ThreadSafeSortedColumns.factory() : AtomicSortedColumns.factory()
                                        : TreeMapBackedSortedColumns.factory();
         ColumnFamily container = ColumnFamily.create(cfs.metadata, factory, filter.filter.isReversed());
         List<IColumnIterator> iterators = new ArrayList<IColumnIterator>();
@@ -208,8 +209,9 @@ public class CollationController
     private ColumnFamily collectAllData()
     {
         logger.debug("collectAllData");
+        // AtomicSortedColumns doesn't work for super columns (see #3821)
         ISortedColumns.Factory factory = mutableColumns
-                                       ? AtomicSortedColumns.factory()
+                                       ? cfs.metadata.cfType == ColumnFamilyType.Super ? ThreadSafeSortedColumns.factory() : AtomicSortedColumns.factory()
                                        : ArrayBackedSortedColumns.factory();
         List<IColumnIterator> iterators = new ArrayList<IColumnIterator>();
         ColumnFamily returnCF = ColumnFamily.create(cfs.metadata, factory, filter.filter.isReversed());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9f90a799/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index c0c6289..ddf2179 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -223,7 +223,8 @@ public class Memtable
 
         if (previous == null)
         {
-            ColumnFamily empty = cf.cloneMeShallow(AtomicSortedColumns.factory(), false);
+            // AtomicSortedColumns doesn't work for super columns (see #3821)
+            ColumnFamily empty = cf.cloneMeShallow(cf.isSuper() ? ThreadSafeSortedColumns.factory() : AtomicSortedColumns.factory(), false);
             // We'll add the columns later. This avoids wasting works if we get beaten in the putIfAbsent
             previous = columnFamilies.putIfAbsent(new DecoratedKey(key.token, allocator.clone(key.key)), empty);
             if (previous == null)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9f90a799/src/java/org/apache/cassandra/db/ThreadSafeSortedColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ThreadSafeSortedColumns.java b/src/java/org/apache/cassandra/db/ThreadSafeSortedColumns.java
new file mode 100644
index 0000000..c82c658
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/ThreadSafeSortedColumns.java
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.SortedMap;
+import java.util.SortedSet;
+import java.util.concurrent.ConcurrentSkipListMap;
+
+import com.google.common.base.Function;
+
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.utils.Allocator;
+
+public class ThreadSafeSortedColumns extends AbstractThreadUnsafeSortedColumns implements ISortedColumns
+{
+    private final ConcurrentSkipListMap<ByteBuffer, IColumn> map;
+
+    public static final ISortedColumns.Factory factory = new Factory()
+    {
+        public ISortedColumns create(AbstractType<?> comparator, boolean insertReversed)
+        {
+            return new ThreadSafeSortedColumns(comparator);
+        }
+
+        public ISortedColumns fromSorted(SortedMap<ByteBuffer, IColumn> sortedMap, boolean insertReversed)
+        {
+            return new ThreadSafeSortedColumns(sortedMap);
+        }
+    };
+
+    public static ISortedColumns.Factory factory()
+    {
+        return factory;
+    }
+
+    public AbstractType<?> getComparator()
+    {
+        return (AbstractType<?>)map.comparator();
+    }
+
+    private ThreadSafeSortedColumns(AbstractType<?> comparator)
+    {
+        this.map = new ConcurrentSkipListMap<ByteBuffer, IColumn>(comparator);
+    }
+
+    private ThreadSafeSortedColumns(SortedMap<ByteBuffer, IColumn> columns)
+    {
+        this.map = new ConcurrentSkipListMap<ByteBuffer, IColumn>(columns);
+    }
+
+    public ISortedColumns.Factory getFactory()
+    {
+        return factory();
+    }
+
+    public ISortedColumns cloneMe()
+    {
+        return new ThreadSafeSortedColumns(map);
+    }
+
+    public boolean isInsertReversed()
+    {
+        return false;
+    }
+
+    /*
+     * If we find an old column that has the same name
+     * the ask it to resolve itself else add the new column
+    */
+    public void addColumn(IColumn column, Allocator allocator)
+    {
+        ByteBuffer name = column.name();
+        IColumn oldColumn;
+        while ((oldColumn = map.putIfAbsent(name, column)) != null)
+        {
+            if (oldColumn instanceof SuperColumn)
+            {
+                assert column instanceof SuperColumn;
+                ((SuperColumn) oldColumn).putColumn((SuperColumn)column, allocator);
+                break;  // Delegated to SuperColumn
+            }
+            else
+            {
+                // calculate reconciled col from old (existing) col and new col
+                IColumn reconciledColumn = column.reconcile(oldColumn, allocator);
+                if (map.replace(name, oldColumn, reconciledColumn))
+                    break;
+
+                // We failed to replace column due to a concurrent update or a concurrent removal. Keep trying.
+                // (Currently, concurrent removal should not happen (only updates), but let us support that anyway.)
+            }
+        }
+    }
+
+    /**
+     * We need to go through each column in the column container and resolve it before adding
+     */
+    protected void addAllColumns(ISortedColumns cm, Allocator allocator, Function<IColumn, IColumn> transformation)
+    {
+        for (IColumn column : cm.getSortedColumns())
+            addColumn(transformation.apply(column), allocator);
+    }
+
+    public boolean replace(IColumn oldColumn, IColumn newColumn)
+    {
+        if (!oldColumn.name().equals(newColumn.name()))
+            throw new IllegalArgumentException();
+
+        return map.replace(oldColumn.name(), oldColumn, newColumn);
+    }
+
+    public IColumn getColumn(ByteBuffer name)
+    {
+        return map.get(name);
+    }
+
+    public void removeColumn(ByteBuffer name)
+    {
+        map.remove(name);
+    }
+
+    public void clear()
+    {
+        map.clear();
+    }
+
+    public int size()
+    {
+        return map.size();
+    }
+
+    public Collection<IColumn> getSortedColumns()
+    {
+        return map.values();
+    }
+
+    public Collection<IColumn> getReverseSortedColumns()
+    {
+        return map.descendingMap().values();
+    }
+
+    public SortedSet<ByteBuffer> getColumnNames()
+    {
+        return map.navigableKeySet();
+    }
+
+    public Iterator<IColumn> iterator()
+    {
+        return map.values().iterator();
+    }
+
+    public Iterator<IColumn> reverseIterator()
+    {
+        return getReverseSortedColumns().iterator();
+    }
+
+    public Iterator<IColumn> iterator(ByteBuffer start)
+    {
+        return map.tailMap(start).values().iterator();
+    }
+
+    public Iterator<IColumn> reverseIterator(ByteBuffer start)
+    {
+        return map.descendingMap().tailMap(start).values().iterator();
+    }
+}