You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2012/09/05 17:21:16 UTC
[1/3] git commit: Merge branch 'cassandra-1.1' into trunk
Updated Branches:
refs/heads/cassandra-1.1 846b14019 -> 8eb2fed1e
refs/heads/trunk bf2d3433b -> 00e715054
Merge branch 'cassandra-1.1' into trunk
Conflicts:
src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/00e71505
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/00e71505
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/00e71505
Branch: refs/heads/trunk
Commit: 00e7150545f5109ac59dd92d8520a75cf17a9c48
Parents: bf2d343 8eb2fed
Author: Yuki Morishita <yu...@apache.org>
Authored: Wed Sep 5 10:19:43 2012 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed Sep 5 10:19:43 2012 -0500
----------------------------------------------------------------------
.../org/apache/cassandra/db/ColumnFamilyStore.java | 6 ++
.../db/compaction/AbstractCompactionStrategy.java | 14 -----
.../apache/cassandra/service/CassandraDaemon.java | 39 ++++++++++++++-
.../org/apache/cassandra/utils/DefaultInteger.java | 5 ++
4 files changed, 49 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/00e71505/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/00e71505/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
index 12fdaf3,93c6298..d02a003
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
@@@ -25,7 -27,7 +25,6 @@@ import org.apache.cassandra.dht.Range
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.sstable.SSTable;
import org.apache.cassandra.io.sstable.SSTableReader;
--import org.apache.cassandra.service.StorageService;
/**
* Pluggable compaction strategy determines how SSTables get merged.
@@@ -50,22 -47,6 +49,9 @@@ public abstract class AbstractCompactio
assert cfs != null;
this.cfs = cfs;
this.options = options;
+
+ String optionValue = options.get(TOMBSTONE_THRESHOLD_KEY);
+ tombstoneThreshold = optionValue == null ? DEFAULT_TOMBSTONE_THRESHOLD : Float.parseFloat(optionValue);
-
- // start compactions in five minutes (if no flushes have occurred by then to do so)
- Runnable runnable = new Runnable()
- {
- public void run()
- {
- if (CompactionManager.instance.getActiveCompactions() == 0)
- {
- CompactionManager.instance.submitBackground(AbstractCompactionStrategy.this.cfs);
- }
- }
- };
- StorageService.optionalTasks.schedule(runnable, 5 * 60, TimeUnit.SECONDS);
}
public Map<String, String> getOptions()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/00e71505/src/java/org/apache/cassandra/service/CassandraDaemon.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/CassandraDaemon.java
index efe6564,6b048b5..4e64fa3
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@@ -7,279 -9,27 +7,316 @@@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
+ * http://www.apache.org/licenses/LICENSE-2.0
*
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
+package org.apache.cassandra.service;
-
+import java.io.File;
import java.io.IOException;
+import java.net.InetAddress;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.Arrays;
++import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.collect.Iterables;
+import org.apache.log4j.PropertyConfigurator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ConfigurationException;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.commitlog.CommitLog;
++import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.io.FSError;
+import org.apache.cassandra.io.FSReadError;
+import org.apache.cassandra.thrift.ThriftServer;
+import org.apache.cassandra.utils.CLibrary;
+import org.apache.cassandra.utils.Mx4jTool;
/**
- * The <code>CassandraDaemon</code> interface captures the lifecycle of a
- * Cassandra daemon that runs on a single node.
- *
+ * The <code>CassandraDaemon</code> is an abstraction for a Cassandra daemon
+ * service, which defines not only a way to activate and deactivate it, but also
+ * hooks into its lifecycle methods (see {@link #setup()}, {@link #start()},
+ * {@link #stop()} and {@link #setup()}).
*/
-public interface CassandraDaemon
+public class CassandraDaemon
{
+ static
+ {
+ initLog4j();
+ }
+
+ /**
+ * Initialize logging in such a way that it checks for config changes every 10 seconds.
+ */
+ public static void initLog4j()
+ {
+ if (System.getProperty("log4j.defaultInitOverride","false").equalsIgnoreCase("true"))
+ {
+ String config = System.getProperty("log4j.configuration", "log4j-server.properties");
+ URL configLocation = null;
+ try
+ {
+ // try loading from a physical location first.
+ configLocation = new URL(config);
+ }
+ catch (MalformedURLException ex)
+ {
+ // then try loading from the classpath.
+ configLocation = CassandraDaemon.class.getClassLoader().getResource(config);
+ }
+
+ if (configLocation == null)
+ throw new RuntimeException("Couldn't figure out log4j configuration: "+config);
+
+ // Now convert URL to a filename
+ String configFileName = null;
+ try
+ {
+ // first try URL.getFile() which works for opaque URLs (file:foo) and paths without spaces
+ configFileName = configLocation.getFile();
+ File configFile = new File(configFileName);
+ // then try alternative approach which works for all hierarchical URLs with or without spaces
+ if (!configFile.exists())
+ configFileName = new File(configLocation.toURI()).getCanonicalPath();
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException("Couldn't convert log4j configuration location to a valid file", e);
+ }
+
+ PropertyConfigurator.configureAndWatch(configFileName, 10000);
+ org.apache.log4j.Logger.getLogger(CassandraDaemon.class).info("Logging initialized");
+ }
+ }
+
+ private static final Logger logger = LoggerFactory.getLogger(CassandraDaemon.class);
+
+ private static final CassandraDaemon instance = new CassandraDaemon();
+
+ static final AtomicInteger exceptions = new AtomicInteger();
+
+ public Server thriftServer;
+ public Server nativeServer;
+
+ /**
+ * This is a hook for concrete daemons to initialize themselves suitably.
+ *
+ * Subclasses should override this to finish the job (listening on ports, etc.)
+ *
+ * @throws IOException
+ */
+ protected void setup() throws IOException
+ {
+ logger.info("JVM vendor/version: {}/{}", System.getProperty("java.vm.name"), System.getProperty("java.version") );
+ logger.info("Heap size: {}/{}", Runtime.getRuntime().totalMemory(), Runtime.getRuntime().maxMemory());
+ logger.info("Classpath: {}", System.getProperty("java.class.path"));
+ CLibrary.tryMlockall();
+
+ Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler()
+ {
+ public void uncaughtException(Thread t, Throwable e)
+ {
+ exceptions.incrementAndGet();
+ logger.error("Exception in thread " + t, e);
+ for (Throwable e2 = e; e2 != null; e2 = e2.getCause())
+ {
+ // some code, like FileChannel.map, will wrap an OutOfMemoryError in another exception
+ if (e2 instanceof OutOfMemoryError)
+ System.exit(100);
+
+ if (e2 instanceof FSError)
+ {
+ if (e2 != e) // make sure FSError gets logged exactly once.
+ logger.error("Exception in thread " + t, e2);
+ handleFSError((FSError) e2);
+ }
+ }
+ }
+
+ private void handleFSError(FSError e)
+ {
+ switch (DatabaseDescriptor.getDiskFailurePolicy())
+ {
+ case stop:
+ logger.error("Stopping the gossiper and the RPC server");
+ StorageService.instance.stopGossiping();
+ StorageService.instance.stopRPCServer();
+ break;
+ case best_effort:
+ // for both read and write errors mark the path as unwritable.
+ BlacklistedDirectories.maybeMarkUnwritable(e.path);
+ if (e instanceof FSReadError)
+ {
+ File directory = BlacklistedDirectories.maybeMarkUnreadable(e.path);
+ if (directory != null)
+ Table.removeUnreadableSSTables(directory);
+ }
+ break;
+ case ignore:
+ // already logged, so left nothing to do
+ break;
+ default:
+ throw new IllegalStateException();
+ }
+ }
+ });
+
+ // check all directories(data, commitlog, saved cache) for existence and permission
+ Iterable<String> dirs = Iterables.concat(Arrays.asList(DatabaseDescriptor.getAllDataFileLocations()),
+ Arrays.asList(DatabaseDescriptor.getCommitLogLocation(),
+ DatabaseDescriptor.getSavedCachesLocation()));
+ for (String dataDir : dirs)
+ {
+ logger.debug("Checking directory {}", dataDir);
+ File dir = new File(dataDir);
+ if (dir.exists())
+ assert dir.isDirectory() && dir.canRead() && dir.canWrite() && dir.canExecute()
+ : String.format("Directory %s is not accessible.", dataDir);
+ }
+
+ // Migrate sstables from pre-#2749 to the correct location
+ if (Directories.sstablesNeedsMigration())
+ Directories.migrateSSTables();
+
+ if (CacheService.instance == null) // should never happen
+ throw new RuntimeException("Failed to initialize Cache Service.");
+
+ // check the system table to keep user from shooting self in foot by changing partitioner, cluster name, etc.
+ // we do a one-off scrub of the system table first; we can't load the list of the rest of the tables,
+ // until system table is opened.
+ for (CFMetaData cfm : Schema.instance.getTableMetaData(Table.SYSTEM_KS).values())
+ ColumnFamilyStore.scrubDataDirectories(Table.SYSTEM_KS, cfm.cfName);
+ try
+ {
+ SystemTable.checkHealth();
+ }
+ catch (ConfigurationException e)
+ {
+ logger.error("Fatal exception during initialization", e);
+ System.exit(100);
+ }
+
+ // load keyspace descriptions.
+ try
+ {
+ DatabaseDescriptor.loadSchemas();
+ }
+ catch (IOException e)
+ {
+ logger.error("Fatal exception during initialization", e);
+ System.exit(100);
+ }
+
+ // clean up debris in the rest of the tables
+ for (String table : Schema.instance.getTables())
+ {
+ for (CFMetaData cfm : Schema.instance.getTableMetaData(table).values())
+ {
+ ColumnFamilyStore.scrubDataDirectories(table, cfm.cfName);
+ }
+ }
+
+ // initialize keyspaces
+ for (String table : Schema.instance.getTables())
+ {
+ if (logger.isDebugEnabled())
+ logger.debug("opening keyspace " + table);
- Table.open(table);
++ // disable auto compaction until commit log replay ends
++ for (ColumnFamilyStore cfs : Table.open(table).getColumnFamilyStores())
++ {
++ for (ColumnFamilyStore store : cfs.concatWithIndexes())
++ {
++ store.disableAutoCompaction();
++ }
++ }
+ }
+
+ if (CacheService.instance.keyCache.size() > 0)
+ logger.info("completed pre-loading ({} keys) key cache.", CacheService.instance.keyCache.size());
+
+ if (CacheService.instance.rowCache.size() > 0)
+ logger.info("completed pre-loading ({} keys) row cache.", CacheService.instance.rowCache.size());
+
+ try
+ {
+ GCInspector.instance.start();
+ }
+ catch (Throwable t)
+ {
+ logger.warn("Unable to start GCInspector (currently only supported on the Sun JVM)");
+ }
+
+ // replay the log if necessary
+ CommitLog.instance.recover();
+
++ // enable auto compaction
++ for (Table table : Table.all())
++ {
++ for (ColumnFamilyStore cfs : table.getColumnFamilyStores())
++ {
++ for (final ColumnFamilyStore store : cfs.concatWithIndexes())
++ {
++ store.enableAutoCompaction();
++ }
++ }
++ }
++ // start compactions in five minutes (if no flushes have occurred by then to do so)
++ Runnable runnable = new Runnable()
++ {
++ public void run()
++ {
++ for (Table table : Table.all())
++ {
++ for (ColumnFamilyStore cf : table.getColumnFamilyStores())
++ {
++ for (ColumnFamilyStore store : cf.concatWithIndexes())
++ CompactionManager.instance.submitBackground(store);
++ }
++ }
++ }
++ };
++ StorageService.optionalTasks.schedule(runnable, 5 * 60, TimeUnit.SECONDS);
++
+ SystemTable.finishStartup();
+
+ // start server internals
+ StorageService.instance.registerDaemon(this);
+ try
+ {
+ StorageService.instance.initServer();
+ }
+ catch (ConfigurationException e)
+ {
+ logger.error("Fatal configuration error", e);
+ System.err.println(e.getMessage() + "\nFatal configuration error; unable to start server. See log for stacktrace.");
+ System.exit(1);
+ }
+
+ Mx4jTool.maybeLoad();
+
+ // Thift
+ InetAddress rpcAddr = DatabaseDescriptor.getRpcAddress();
+ int rpcPort = DatabaseDescriptor.getRpcPort();
+ thriftServer = new ThriftServer(rpcAddr, rpcPort);
+
+ // Native transport
+ InetAddress nativeAddr = DatabaseDescriptor.getNativeTransportAddress();
+ int nativePort = DatabaseDescriptor.getNativeTransportPort();
+ nativeServer = new org.apache.cassandra.transport.Server(nativeAddr, nativePort);
+ }
+
/**
* Initialize the Cassandra Daemon based on the given <a
* href="http://commons.apache.org/daemon/jsvc.html">Commons
http://git-wip-us.apache.org/repos/asf/cassandra/blob/00e71505/src/java/org/apache/cassandra/utils/DefaultInteger.java
----------------------------------------------------------------------