You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2013/11/26 21:28:25 UTC
[1/6] git commit: fix build
Updated Branches:
refs/heads/cassandra-1.2 8145c8356 -> cc8a05ab6
refs/heads/cassandra-2.0 504f66dc1 -> e68d466eb
refs/heads/trunk 1bfd062fd -> c384d31b8
fix build
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/cc8a05ab
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/cc8a05ab
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/cc8a05ab
Branch: refs/heads/cassandra-1.2
Commit: cc8a05ab6ac22f019e60ec79c11338d4c77d49c3
Parents: 8145c83
Author: Jonathan Ellis <jb...@apache.org>
Authored: Tue Nov 26 14:27:52 2013 -0600
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Tue Nov 26 14:27:52 2013 -0600
----------------------------------------------------------------------
src/java/org/apache/cassandra/db/Table.java | 3 ---
1 file changed, 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc8a05ab/src/java/org/apache/cassandra/db/Table.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Table.java b/src/java/org/apache/cassandra/db/Table.java
index a851eee..e6df982 100644
--- a/src/java/org/apache/cassandra/db/Table.java
+++ b/src/java/org/apache/cassandra/db/Table.java
@@ -275,9 +275,6 @@ public class Table
public void createReplicationStrategy(KSMetaData ksm)
{
- if (replicationStrategy != null)
- StorageService.instance.getTokenMetadata().unregister(replicationStrategy);
-
replicationStrategy = AbstractReplicationStrategy.createReplicationStrategy(ksm.name,
ksm.strategyClass,
StorageService.instance.getTokenMetadata(),
[2/6] git commit: fix build
Posted by jb...@apache.org.
fix build
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/cc8a05ab
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/cc8a05ab
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/cc8a05ab
Branch: refs/heads/cassandra-2.0
Commit: cc8a05ab6ac22f019e60ec79c11338d4c77d49c3
Parents: 8145c83
Author: Jonathan Ellis <jb...@apache.org>
Authored: Tue Nov 26 14:27:52 2013 -0600
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Tue Nov 26 14:27:52 2013 -0600
----------------------------------------------------------------------
src/java/org/apache/cassandra/db/Table.java | 3 ---
1 file changed, 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc8a05ab/src/java/org/apache/cassandra/db/Table.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Table.java b/src/java/org/apache/cassandra/db/Table.java
index a851eee..e6df982 100644
--- a/src/java/org/apache/cassandra/db/Table.java
+++ b/src/java/org/apache/cassandra/db/Table.java
@@ -275,9 +275,6 @@ public class Table
public void createReplicationStrategy(KSMetaData ksm)
{
- if (replicationStrategy != null)
- StorageService.instance.getTokenMetadata().unregister(replicationStrategy);
-
replicationStrategy = AbstractReplicationStrategy.createReplicationStrategy(ksm.name,
ksm.strategyClass,
StorageService.instance.getTokenMetadata(),
[4/6] git commit: Merge branch 'cassandra-1.2' into cassandra-2.0
Posted by jb...@apache.org.
Merge branch 'cassandra-1.2' into cassandra-2.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e68d466e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e68d466e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e68d466e
Branch: refs/heads/trunk
Commit: e68d466eb226134a73469648af5085da43669fd8
Parents: 504f66d cc8a05a
Author: Jonathan Ellis <jb...@apache.org>
Authored: Tue Nov 26 14:27:58 2013 -0600
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Tue Nov 26 14:27:58 2013 -0600
----------------------------------------------------------------------
src/java/org/apache/cassandra/db/Keyspace.java | 3 ---
1 file changed, 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e68d466e/src/java/org/apache/cassandra/db/Keyspace.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/Keyspace.java
index 4914c11,0000000..0280ed2
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/Keyspace.java
+++ b/src/java/org/apache/cassandra/db/Keyspace.java
@@@ -1,454 -1,0 +1,451 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Future;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.KSMetaData;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.commitlog.CommitLog;
+import org.apache.cassandra.db.filter.QueryFilter;
+import org.apache.cassandra.db.index.SecondaryIndex;
+import org.apache.cassandra.db.index.SecondaryIndexManager;
+import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.locator.AbstractReplicationStrategy;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.service.pager.QueryPagers;
+import org.apache.cassandra.tracing.Tracing;
+
+/**
+ * It represents a Keyspace.
+ */
+public class Keyspace
+{
+ public static final String SYSTEM_KS = "system";
+ private static final int DEFAULT_PAGE_SIZE = 10000;
+
+ private static final Logger logger = LoggerFactory.getLogger(Keyspace.class);
+
+ /**
+ * accesses to CFS.memtable should acquire this for thread safety.
+ * CFS.maybeSwitchMemtable should aquire the writeLock; see that method for the full explanation.
+ * <p/>
+ * (Enabling fairness in the RRWL is observed to decrease throughput, so we leave it off.)
+ */
+ public static final ReentrantReadWriteLock switchLock = new ReentrantReadWriteLock();
+
+ // It is possible to call Keyspace.open without a running daemon, so it makes sense to ensure
+ // proper directories here as well as in CassandraDaemon.
+ static
+ {
+ if (!StorageService.instance.isClientMode())
+ DatabaseDescriptor.createAllDirectories();
+ }
+
+ public final KSMetaData metadata;
+
+ /* ColumnFamilyStore per column family */
+ private final ConcurrentMap<UUID, ColumnFamilyStore> columnFamilyStores = new ConcurrentHashMap<UUID, ColumnFamilyStore>();
+ private volatile AbstractReplicationStrategy replicationStrategy;
+ public static final Function<String,Keyspace> keyspaceTransformer = new Function<String, Keyspace>()
+ {
+ public Keyspace apply(String keyspaceName)
+ {
+ return Keyspace.open(keyspaceName);
+ }
+ };
+
+ public static Keyspace open(String keyspaceName)
+ {
+ return open(keyspaceName, Schema.instance, true);
+ }
+
+ public static Keyspace openWithoutSSTables(String keyspaceName)
+ {
+ return open(keyspaceName, Schema.instance, false);
+ }
+
+ private static Keyspace open(String keyspaceName, Schema schema, boolean loadSSTables)
+ {
+ Keyspace keyspaceInstance = schema.getKeyspaceInstance(keyspaceName);
+
+ if (keyspaceInstance == null)
+ {
+ // instantiate the Keyspace. we could use putIfAbsent but it's important to making sure it is only done once
+ // per keyspace, so we synchronize and re-check before doing it.
+ synchronized (Keyspace.class)
+ {
+ keyspaceInstance = schema.getKeyspaceInstance(keyspaceName);
+ if (keyspaceInstance == null)
+ {
+ // open and store the keyspace
+ keyspaceInstance = new Keyspace(keyspaceName, loadSSTables);
+ schema.storeKeyspaceInstance(keyspaceInstance);
+
+ // keyspace has to be constructed and in the cache before cacheRow can be called
+ for (ColumnFamilyStore cfs : keyspaceInstance.getColumnFamilyStores())
+ cfs.initRowCache();
+ }
+ }
+ }
+ return keyspaceInstance;
+ }
+
+ public static Keyspace clear(String keyspaceName)
+ {
+ return clear(keyspaceName, Schema.instance);
+ }
+
+ public static Keyspace clear(String keyspaceName, Schema schema)
+ {
+ synchronized (Keyspace.class)
+ {
+ Keyspace t = schema.removeKeyspaceInstance(keyspaceName);
+ if (t != null)
+ {
+ for (ColumnFamilyStore cfs : t.getColumnFamilyStores())
+ t.unloadCf(cfs);
+ }
+ return t;
+ }
+ }
+
+ /**
+ * Removes every SSTable in the directory from the appropriate DataTracker's view.
+ * @param directory the unreadable directory, possibly with SSTables in it, but not necessarily.
+ */
+ public static void removeUnreadableSSTables(File directory)
+ {
+ for (Keyspace keyspace : Keyspace.all())
+ {
+ for (ColumnFamilyStore baseCfs : keyspace.getColumnFamilyStores())
+ {
+ for (ColumnFamilyStore cfs : baseCfs.concatWithIndexes())
+ cfs.maybeRemoveUnreadableSSTables(directory);
+ }
+ }
+ }
+
+ public Collection<ColumnFamilyStore> getColumnFamilyStores()
+ {
+ return Collections.unmodifiableCollection(columnFamilyStores.values());
+ }
+
+ public ColumnFamilyStore getColumnFamilyStore(String cfName)
+ {
+ UUID id = Schema.instance.getId(getName(), cfName);
+ if (id == null)
+ throw new IllegalArgumentException(String.format("Unknown keyspace/cf pair (%s.%s)", getName(), cfName));
+ return getColumnFamilyStore(id);
+ }
+
+ public ColumnFamilyStore getColumnFamilyStore(UUID id)
+ {
+ ColumnFamilyStore cfs = columnFamilyStores.get(id);
+ if (cfs == null)
+ throw new IllegalArgumentException("Unknown CF " + id);
+ return cfs;
+ }
+
+ /**
+ * Take a snapshot of the specific column family, or the entire set of column families
+ * if columnFamily is null with a given timestamp
+ *
+ * @param snapshotName the tag associated with the name of the snapshot. This value may not be null
+ * @param columnFamilyName the column family to snapshot or all on null
+ * @throws IOException if the column family doesn't exist
+ */
+ public void snapshot(String snapshotName, String columnFamilyName) throws IOException
+ {
+ assert snapshotName != null;
+ boolean tookSnapShot = false;
+ for (ColumnFamilyStore cfStore : columnFamilyStores.values())
+ {
+ if (columnFamilyName == null || cfStore.name.equals(columnFamilyName))
+ {
+ tookSnapShot = true;
+ cfStore.snapshot(snapshotName);
+ }
+ }
+
+ if ((columnFamilyName != null) && !tookSnapShot)
+ throw new IOException("Failed taking snapshot. Column family " + columnFamilyName + " does not exist.");
+ }
+
+ /**
+ * @param clientSuppliedName may be null.
+ * @return the name of the snapshot
+ */
+ public static String getTimestampedSnapshotName(String clientSuppliedName)
+ {
+ String snapshotName = Long.toString(System.currentTimeMillis());
+ if (clientSuppliedName != null && !clientSuppliedName.equals(""))
+ {
+ snapshotName = snapshotName + "-" + clientSuppliedName;
+ }
+ return snapshotName;
+ }
+
+ /**
+ * Check whether snapshots already exists for a given name.
+ *
+ * @param snapshotName the user supplied snapshot name
+ * @return true if the snapshot exists
+ */
+ public boolean snapshotExists(String snapshotName)
+ {
+ assert snapshotName != null;
+ for (ColumnFamilyStore cfStore : columnFamilyStores.values())
+ {
+ if (cfStore.snapshotExists(snapshotName))
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Clear all the snapshots for a given keyspace.
+ *
+ * @param snapshotName the user supplied snapshot name. It empty or null,
+ * all the snapshots will be cleaned
+ */
+ public void clearSnapshot(String snapshotName)
+ {
+ for (ColumnFamilyStore cfStore : columnFamilyStores.values())
+ {
+ cfStore.clearSnapshot(snapshotName);
+ }
+ }
+
+ /**
+ * @return A list of open SSTableReaders
+ */
+ public List<SSTableReader> getAllSSTables()
+ {
+ List<SSTableReader> list = new ArrayList<SSTableReader>(columnFamilyStores.size());
+ for (ColumnFamilyStore cfStore : columnFamilyStores.values())
+ list.addAll(cfStore.getSSTables());
+ return list;
+ }
+
+ private Keyspace(String keyspaceName, boolean loadSSTables)
+ {
+ metadata = Schema.instance.getKSMetaData(keyspaceName);
+ assert metadata != null : "Unknown keyspace " + keyspaceName;
+ createReplicationStrategy(metadata);
+
+ for (CFMetaData cfm : new ArrayList<CFMetaData>(metadata.cfMetaData().values()))
+ {
+ logger.debug("Initializing {}.{}", getName(), cfm.cfName);
+ initCf(cfm.cfId, cfm.cfName, loadSSTables);
+ }
+ }
+
+ public void createReplicationStrategy(KSMetaData ksm)
+ {
- if (replicationStrategy != null)
- StorageService.instance.getTokenMetadata().unregister(replicationStrategy);
-
+ replicationStrategy = AbstractReplicationStrategy.createReplicationStrategy(ksm.name,
+ ksm.strategyClass,
+ StorageService.instance.getTokenMetadata(),
+ DatabaseDescriptor.getEndpointSnitch(),
+ ksm.strategyOptions);
+ }
+
+ // best invoked on the compaction mananger.
+ public void dropCf(UUID cfId)
+ {
+ assert columnFamilyStores.containsKey(cfId);
+ ColumnFamilyStore cfs = columnFamilyStores.remove(cfId);
+ if (cfs == null)
+ return;
+
+ unloadCf(cfs);
+ }
+
+ // disassociate a cfs from this keyspace instance.
+ private void unloadCf(ColumnFamilyStore cfs)
+ {
+ cfs.forceBlockingFlush();
+ cfs.invalidate();
+ }
+
+ /**
+ * adds a cf to internal structures, ends up creating disk files).
+ */
+ public void initCf(UUID cfId, String cfName, boolean loadSSTables)
+ {
+ ColumnFamilyStore cfs = columnFamilyStores.get(cfId);
+
+ if (cfs == null)
+ {
+ // CFS being created for the first time, either on server startup or new CF being added.
+ // We don't worry about races here; startup is safe, and adding multiple idential CFs
+ // simultaneously is a "don't do that" scenario.
+ ColumnFamilyStore oldCfs = columnFamilyStores.putIfAbsent(cfId, ColumnFamilyStore.createColumnFamilyStore(this, cfName, loadSSTables));
+ // CFS mbean instantiation will error out before we hit this, but in case that changes...
+ if (oldCfs != null)
+ throw new IllegalStateException("added multiple mappings for cf id " + cfId);
+ }
+ else
+ {
+ // re-initializing an existing CF. This will happen if you cleared the schema
+ // on this node and it's getting repopulated from the rest of the cluster.
+ assert cfs.name.equals(cfName);
+ cfs.metadata.reload();
+ cfs.reload();
+ }
+ }
+
+ public Row getRow(QueryFilter filter)
+ {
+ ColumnFamilyStore cfStore = getColumnFamilyStore(filter.getColumnFamilyName());
+ ColumnFamily columnFamily = cfStore.getColumnFamily(filter);
+ return new Row(filter.key, columnFamily);
+ }
+
+ public void apply(RowMutation mutation, boolean writeCommitLog)
+ {
+ apply(mutation, writeCommitLog, true);
+ }
+
+ /**
+ * This method appends a row to the global CommitLog, then updates memtables and indexes.
+ *
+ * @param mutation the row to write. Must not be modified after calling apply, since commitlog append
+ * may happen concurrently, depending on the CL Executor type.
+ * @param writeCommitLog false to disable commitlog append entirely
+ * @param updateIndexes false to disable index updates (used by CollationController "defragmenting")
+ */
+ public void apply(RowMutation mutation, boolean writeCommitLog, boolean updateIndexes)
+ {
+ // write the mutation to the commitlog and memtables
+ Tracing.trace("Acquiring switchLock read lock");
+ switchLock.readLock().lock();
+ try
+ {
+ if (writeCommitLog)
+ {
+ Tracing.trace("Appending to commitlog");
+ CommitLog.instance.add(mutation);
+ }
+
+ DecoratedKey key = StorageService.getPartitioner().decorateKey(mutation.key());
+ for (ColumnFamily cf : mutation.getColumnFamilies())
+ {
+ ColumnFamilyStore cfs = columnFamilyStores.get(cf.id());
+ if (cfs == null)
+ {
+ logger.error("Attempting to mutate non-existant column family " + cf.id());
+ continue;
+ }
+
+ Tracing.trace("Adding to {} memtable", cf.metadata().cfName);
+ cfs.apply(key, cf, updateIndexes ? cfs.indexManager.updaterFor(key, cf) : SecondaryIndexManager.nullUpdater);
+ }
+ }
+ finally
+ {
+ switchLock.readLock().unlock();
+ }
+ }
+
+ public AbstractReplicationStrategy getReplicationStrategy()
+ {
+ return replicationStrategy;
+ }
+
+ /**
+ * @param key row to index
+ * @param cfs ColumnFamily to index row in
+ * @param idxNames columns to index, in comparator order
+ */
+ public static void indexRow(DecoratedKey key, ColumnFamilyStore cfs, Set<String> idxNames)
+ {
+ if (logger.isDebugEnabled())
+ logger.debug("Indexing row {} ", cfs.metadata.getKeyValidator().getString(key.key));
+
+ Collection<SecondaryIndex> indexes = cfs.indexManager.getIndexesByNames(idxNames);
+
+ switchLock.readLock().lock();
+ try
+ {
+ Iterator<ColumnFamily> pager = QueryPagers.pageRowLocally(cfs, key.key, DEFAULT_PAGE_SIZE);
+ while (pager.hasNext())
+ {
+ ColumnFamily cf = pager.next();
+ ColumnFamily cf2 = cf.cloneMeShallow();
+ for (Column column : cf)
+ {
+ if (cfs.indexManager.indexes(column.name(), indexes))
+ cf2.addColumn(column);
+ }
+ cfs.indexManager.indexRow(key.key, cf2);
+ }
+ }
+ finally
+ {
+ switchLock.readLock().unlock();
+ }
+ }
+
+ public List<Future<?>> flush()
+ {
+ List<Future<?>> futures = new ArrayList<Future<?>>(columnFamilyStores.size());
+ for (UUID cfId : columnFamilyStores.keySet())
+ futures.add(columnFamilyStores.get(cfId).forceFlush());
+ return futures;
+ }
+
+ public static Iterable<Keyspace> all()
+ {
+ return Iterables.transform(Schema.instance.getKeyspaces(), keyspaceTransformer);
+ }
+
+ public static Iterable<Keyspace> nonSystem()
+ {
+ return Iterables.transform(Schema.instance.getNonSystemKeyspaces(), keyspaceTransformer);
+ }
+
+ public static Iterable<Keyspace> system()
+ {
+ return Iterables.transform(Schema.systemKeyspaceNames, keyspaceTransformer);
+ }
+
+ @Override
+ public String toString()
+ {
+ return getClass().getSimpleName() + "(name='" + getName() + "')";
+ }
+
+ public String getName()
+ {
+ return metadata.name;
+ }
+}
[3/6] git commit: fix build
Posted by jb...@apache.org.
fix build
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/cc8a05ab
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/cc8a05ab
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/cc8a05ab
Branch: refs/heads/trunk
Commit: cc8a05ab6ac22f019e60ec79c11338d4c77d49c3
Parents: 8145c83
Author: Jonathan Ellis <jb...@apache.org>
Authored: Tue Nov 26 14:27:52 2013 -0600
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Tue Nov 26 14:27:52 2013 -0600
----------------------------------------------------------------------
src/java/org/apache/cassandra/db/Table.java | 3 ---
1 file changed, 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc8a05ab/src/java/org/apache/cassandra/db/Table.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Table.java b/src/java/org/apache/cassandra/db/Table.java
index a851eee..e6df982 100644
--- a/src/java/org/apache/cassandra/db/Table.java
+++ b/src/java/org/apache/cassandra/db/Table.java
@@ -275,9 +275,6 @@ public class Table
public void createReplicationStrategy(KSMetaData ksm)
{
- if (replicationStrategy != null)
- StorageService.instance.getTokenMetadata().unregister(replicationStrategy);
-
replicationStrategy = AbstractReplicationStrategy.createReplicationStrategy(ksm.name,
ksm.strategyClass,
StorageService.instance.getTokenMetadata(),
[5/6] git commit: Merge branch 'cassandra-1.2' into cassandra-2.0
Posted by jb...@apache.org.
Merge branch 'cassandra-1.2' into cassandra-2.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e68d466e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e68d466e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e68d466e
Branch: refs/heads/cassandra-2.0
Commit: e68d466eb226134a73469648af5085da43669fd8
Parents: 504f66d cc8a05a
Author: Jonathan Ellis <jb...@apache.org>
Authored: Tue Nov 26 14:27:58 2013 -0600
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Tue Nov 26 14:27:58 2013 -0600
----------------------------------------------------------------------
src/java/org/apache/cassandra/db/Keyspace.java | 3 ---
1 file changed, 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e68d466e/src/java/org/apache/cassandra/db/Keyspace.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/Keyspace.java
index 4914c11,0000000..0280ed2
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/Keyspace.java
+++ b/src/java/org/apache/cassandra/db/Keyspace.java
@@@ -1,454 -1,0 +1,451 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Future;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.KSMetaData;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.commitlog.CommitLog;
+import org.apache.cassandra.db.filter.QueryFilter;
+import org.apache.cassandra.db.index.SecondaryIndex;
+import org.apache.cassandra.db.index.SecondaryIndexManager;
+import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.locator.AbstractReplicationStrategy;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.service.pager.QueryPagers;
+import org.apache.cassandra.tracing.Tracing;
+
+/**
+ * It represents a Keyspace.
+ */
+public class Keyspace
+{
+ public static final String SYSTEM_KS = "system";
+ private static final int DEFAULT_PAGE_SIZE = 10000;
+
+ private static final Logger logger = LoggerFactory.getLogger(Keyspace.class);
+
+ /**
+ * accesses to CFS.memtable should acquire this for thread safety.
+ * CFS.maybeSwitchMemtable should aquire the writeLock; see that method for the full explanation.
+ * <p/>
+ * (Enabling fairness in the RRWL is observed to decrease throughput, so we leave it off.)
+ */
+ public static final ReentrantReadWriteLock switchLock = new ReentrantReadWriteLock();
+
+ // It is possible to call Keyspace.open without a running daemon, so it makes sense to ensure
+ // proper directories here as well as in CassandraDaemon.
+ static
+ {
+ if (!StorageService.instance.isClientMode())
+ DatabaseDescriptor.createAllDirectories();
+ }
+
+ public final KSMetaData metadata;
+
+ /* ColumnFamilyStore per column family */
+ private final ConcurrentMap<UUID, ColumnFamilyStore> columnFamilyStores = new ConcurrentHashMap<UUID, ColumnFamilyStore>();
+ private volatile AbstractReplicationStrategy replicationStrategy;
+ public static final Function<String,Keyspace> keyspaceTransformer = new Function<String, Keyspace>()
+ {
+ public Keyspace apply(String keyspaceName)
+ {
+ return Keyspace.open(keyspaceName);
+ }
+ };
+
+ public static Keyspace open(String keyspaceName)
+ {
+ return open(keyspaceName, Schema.instance, true);
+ }
+
+ public static Keyspace openWithoutSSTables(String keyspaceName)
+ {
+ return open(keyspaceName, Schema.instance, false);
+ }
+
+ private static Keyspace open(String keyspaceName, Schema schema, boolean loadSSTables)
+ {
+ Keyspace keyspaceInstance = schema.getKeyspaceInstance(keyspaceName);
+
+ if (keyspaceInstance == null)
+ {
+ // instantiate the Keyspace. we could use putIfAbsent but it's important to making sure it is only done once
+ // per keyspace, so we synchronize and re-check before doing it.
+ synchronized (Keyspace.class)
+ {
+ keyspaceInstance = schema.getKeyspaceInstance(keyspaceName);
+ if (keyspaceInstance == null)
+ {
+ // open and store the keyspace
+ keyspaceInstance = new Keyspace(keyspaceName, loadSSTables);
+ schema.storeKeyspaceInstance(keyspaceInstance);
+
+ // keyspace has to be constructed and in the cache before cacheRow can be called
+ for (ColumnFamilyStore cfs : keyspaceInstance.getColumnFamilyStores())
+ cfs.initRowCache();
+ }
+ }
+ }
+ return keyspaceInstance;
+ }
+
+ public static Keyspace clear(String keyspaceName)
+ {
+ return clear(keyspaceName, Schema.instance);
+ }
+
+ public static Keyspace clear(String keyspaceName, Schema schema)
+ {
+ synchronized (Keyspace.class)
+ {
+ Keyspace t = schema.removeKeyspaceInstance(keyspaceName);
+ if (t != null)
+ {
+ for (ColumnFamilyStore cfs : t.getColumnFamilyStores())
+ t.unloadCf(cfs);
+ }
+ return t;
+ }
+ }
+
+ /**
+ * Removes every SSTable in the directory from the appropriate DataTracker's view.
+ * @param directory the unreadable directory, possibly with SSTables in it, but not necessarily.
+ */
+ public static void removeUnreadableSSTables(File directory)
+ {
+ for (Keyspace keyspace : Keyspace.all())
+ {
+ for (ColumnFamilyStore baseCfs : keyspace.getColumnFamilyStores())
+ {
+ for (ColumnFamilyStore cfs : baseCfs.concatWithIndexes())
+ cfs.maybeRemoveUnreadableSSTables(directory);
+ }
+ }
+ }
+
+ public Collection<ColumnFamilyStore> getColumnFamilyStores()
+ {
+ return Collections.unmodifiableCollection(columnFamilyStores.values());
+ }
+
+ public ColumnFamilyStore getColumnFamilyStore(String cfName)
+ {
+ UUID id = Schema.instance.getId(getName(), cfName);
+ if (id == null)
+ throw new IllegalArgumentException(String.format("Unknown keyspace/cf pair (%s.%s)", getName(), cfName));
+ return getColumnFamilyStore(id);
+ }
+
+ public ColumnFamilyStore getColumnFamilyStore(UUID id)
+ {
+ ColumnFamilyStore cfs = columnFamilyStores.get(id);
+ if (cfs == null)
+ throw new IllegalArgumentException("Unknown CF " + id);
+ return cfs;
+ }
+
+ /**
+ * Take a snapshot of the specific column family, or the entire set of column families
+ * if columnFamily is null with a given timestamp
+ *
+ * @param snapshotName the tag associated with the name of the snapshot. This value may not be null
+ * @param columnFamilyName the column family to snapshot or all on null
+ * @throws IOException if the column family doesn't exist
+ */
+ public void snapshot(String snapshotName, String columnFamilyName) throws IOException
+ {
+ assert snapshotName != null;
+ boolean tookSnapShot = false;
+ for (ColumnFamilyStore cfStore : columnFamilyStores.values())
+ {
+ if (columnFamilyName == null || cfStore.name.equals(columnFamilyName))
+ {
+ tookSnapShot = true;
+ cfStore.snapshot(snapshotName);
+ }
+ }
+
+ if ((columnFamilyName != null) && !tookSnapShot)
+ throw new IOException("Failed taking snapshot. Column family " + columnFamilyName + " does not exist.");
+ }
+
+ /**
+ * @param clientSuppliedName may be null.
+ * @return the name of the snapshot
+ */
+ public static String getTimestampedSnapshotName(String clientSuppliedName)
+ {
+ String snapshotName = Long.toString(System.currentTimeMillis());
+ if (clientSuppliedName != null && !clientSuppliedName.equals(""))
+ {
+ snapshotName = snapshotName + "-" + clientSuppliedName;
+ }
+ return snapshotName;
+ }
+
+ /**
+ * Check whether snapshots already exists for a given name.
+ *
+ * @param snapshotName the user supplied snapshot name
+ * @return true if the snapshot exists
+ */
+ public boolean snapshotExists(String snapshotName)
+ {
+ assert snapshotName != null;
+ for (ColumnFamilyStore cfStore : columnFamilyStores.values())
+ {
+ if (cfStore.snapshotExists(snapshotName))
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Clear all the snapshots for a given keyspace.
+ *
+ * @param snapshotName the user supplied snapshot name. It empty or null,
+ * all the snapshots will be cleaned
+ */
+ public void clearSnapshot(String snapshotName)
+ {
+ for (ColumnFamilyStore cfStore : columnFamilyStores.values())
+ {
+ cfStore.clearSnapshot(snapshotName);
+ }
+ }
+
+ /**
+ * @return A list of open SSTableReaders
+ */
+ public List<SSTableReader> getAllSSTables()
+ {
+ List<SSTableReader> list = new ArrayList<SSTableReader>(columnFamilyStores.size());
+ for (ColumnFamilyStore cfStore : columnFamilyStores.values())
+ list.addAll(cfStore.getSSTables());
+ return list;
+ }
+
+ private Keyspace(String keyspaceName, boolean loadSSTables)
+ {
+ metadata = Schema.instance.getKSMetaData(keyspaceName);
+ assert metadata != null : "Unknown keyspace " + keyspaceName;
+ createReplicationStrategy(metadata);
+
+ for (CFMetaData cfm : new ArrayList<CFMetaData>(metadata.cfMetaData().values()))
+ {
+ logger.debug("Initializing {}.{}", getName(), cfm.cfName);
+ initCf(cfm.cfId, cfm.cfName, loadSSTables);
+ }
+ }
+
+ public void createReplicationStrategy(KSMetaData ksm)
+ {
- if (replicationStrategy != null)
- StorageService.instance.getTokenMetadata().unregister(replicationStrategy);
-
+ replicationStrategy = AbstractReplicationStrategy.createReplicationStrategy(ksm.name,
+ ksm.strategyClass,
+ StorageService.instance.getTokenMetadata(),
+ DatabaseDescriptor.getEndpointSnitch(),
+ ksm.strategyOptions);
+ }
+
+ // best invoked on the compaction mananger.
+ public void dropCf(UUID cfId)
+ {
+ assert columnFamilyStores.containsKey(cfId);
+ ColumnFamilyStore cfs = columnFamilyStores.remove(cfId);
+ if (cfs == null)
+ return;
+
+ unloadCf(cfs);
+ }
+
+ // disassociate a cfs from this keyspace instance.
+ private void unloadCf(ColumnFamilyStore cfs)
+ {
+ cfs.forceBlockingFlush();
+ cfs.invalidate();
+ }
+
+ /**
+ * adds a cf to internal structures, ends up creating disk files).
+ */
+ public void initCf(UUID cfId, String cfName, boolean loadSSTables)
+ {
+ ColumnFamilyStore cfs = columnFamilyStores.get(cfId);
+
+ if (cfs == null)
+ {
+ // CFS being created for the first time, either on server startup or new CF being added.
+ // We don't worry about races here; startup is safe, and adding multiple idential CFs
+ // simultaneously is a "don't do that" scenario.
+ ColumnFamilyStore oldCfs = columnFamilyStores.putIfAbsent(cfId, ColumnFamilyStore.createColumnFamilyStore(this, cfName, loadSSTables));
+ // CFS mbean instantiation will error out before we hit this, but in case that changes...
+ if (oldCfs != null)
+ throw new IllegalStateException("added multiple mappings for cf id " + cfId);
+ }
+ else
+ {
+ // re-initializing an existing CF. This will happen if you cleared the schema
+ // on this node and it's getting repopulated from the rest of the cluster.
+ assert cfs.name.equals(cfName);
+ cfs.metadata.reload();
+ cfs.reload();
+ }
+ }
+
+ public Row getRow(QueryFilter filter)
+ {
+ ColumnFamilyStore cfStore = getColumnFamilyStore(filter.getColumnFamilyName());
+ ColumnFamily columnFamily = cfStore.getColumnFamily(filter);
+ return new Row(filter.key, columnFamily);
+ }
+
+ public void apply(RowMutation mutation, boolean writeCommitLog)
+ {
+ apply(mutation, writeCommitLog, true);
+ }
+
+ /**
+ * This method appends a row to the global CommitLog, then updates memtables and indexes.
+ *
+ * @param mutation the row to write. Must not be modified after calling apply, since commitlog append
+ * may happen concurrently, depending on the CL Executor type.
+ * @param writeCommitLog false to disable commitlog append entirely
+ * @param updateIndexes false to disable index updates (used by CollationController "defragmenting")
+ */
+ public void apply(RowMutation mutation, boolean writeCommitLog, boolean updateIndexes)
+ {
+ // write the mutation to the commitlog and memtables
+ Tracing.trace("Acquiring switchLock read lock");
+ switchLock.readLock().lock();
+ try
+ {
+ if (writeCommitLog)
+ {
+ Tracing.trace("Appending to commitlog");
+ CommitLog.instance.add(mutation);
+ }
+
+ DecoratedKey key = StorageService.getPartitioner().decorateKey(mutation.key());
+ for (ColumnFamily cf : mutation.getColumnFamilies())
+ {
+ ColumnFamilyStore cfs = columnFamilyStores.get(cf.id());
+ if (cfs == null)
+ {
+ logger.error("Attempting to mutate non-existant column family " + cf.id());
+ continue;
+ }
+
+ Tracing.trace("Adding to {} memtable", cf.metadata().cfName);
+ cfs.apply(key, cf, updateIndexes ? cfs.indexManager.updaterFor(key, cf) : SecondaryIndexManager.nullUpdater);
+ }
+ }
+ finally
+ {
+ switchLock.readLock().unlock();
+ }
+ }
+
+ public AbstractReplicationStrategy getReplicationStrategy()
+ {
+ return replicationStrategy;
+ }
+
+ /**
+ * @param key row to index
+ * @param cfs ColumnFamily to index row in
+ * @param idxNames columns to index, in comparator order
+ */
+ public static void indexRow(DecoratedKey key, ColumnFamilyStore cfs, Set<String> idxNames)
+ {
+ if (logger.isDebugEnabled())
+ logger.debug("Indexing row {} ", cfs.metadata.getKeyValidator().getString(key.key));
+
+ Collection<SecondaryIndex> indexes = cfs.indexManager.getIndexesByNames(idxNames);
+
+ switchLock.readLock().lock();
+ try
+ {
+ Iterator<ColumnFamily> pager = QueryPagers.pageRowLocally(cfs, key.key, DEFAULT_PAGE_SIZE);
+ while (pager.hasNext())
+ {
+ ColumnFamily cf = pager.next();
+ ColumnFamily cf2 = cf.cloneMeShallow();
+ for (Column column : cf)
+ {
+ if (cfs.indexManager.indexes(column.name(), indexes))
+ cf2.addColumn(column);
+ }
+ cfs.indexManager.indexRow(key.key, cf2);
+ }
+ }
+ finally
+ {
+ switchLock.readLock().unlock();
+ }
+ }
+
+ public List<Future<?>> flush()
+ {
+ List<Future<?>> futures = new ArrayList<Future<?>>(columnFamilyStores.size());
+ for (UUID cfId : columnFamilyStores.keySet())
+ futures.add(columnFamilyStores.get(cfId).forceFlush());
+ return futures;
+ }
+
+ public static Iterable<Keyspace> all()
+ {
+ return Iterables.transform(Schema.instance.getKeyspaces(), keyspaceTransformer);
+ }
+
+ public static Iterable<Keyspace> nonSystem()
+ {
+ return Iterables.transform(Schema.instance.getNonSystemKeyspaces(), keyspaceTransformer);
+ }
+
+ public static Iterable<Keyspace> system()
+ {
+ return Iterables.transform(Schema.systemKeyspaceNames, keyspaceTransformer);
+ }
+
+ @Override
+ public String toString()
+ {
+ return getClass().getSimpleName() + "(name='" + getName() + "')";
+ }
+
+ public String getName()
+ {
+ return metadata.name;
+ }
+}
[6/6] git commit: Merge branch 'cassandra-2.0' into trunk
Posted by jb...@apache.org.
Merge branch 'cassandra-2.0' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c384d31b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c384d31b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c384d31b
Branch: refs/heads/trunk
Commit: c384d31b8b6289f896625a9fc5777bac62ebbbe8
Parents: 1bfd062 e68d466
Author: Jonathan Ellis <jb...@apache.org>
Authored: Tue Nov 26 14:28:16 2013 -0600
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Tue Nov 26 14:28:16 2013 -0600
----------------------------------------------------------------------
src/java/org/apache/cassandra/db/Keyspace.java | 3 ---
1 file changed, 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c384d31b/src/java/org/apache/cassandra/db/Keyspace.java
----------------------------------------------------------------------