You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2012/12/06 04:26:30 UTC
git commit: Fix merge wonkiness.
Updated Branches:
refs/heads/trunk 94e37b4e4 -> 989dd44c0
Fix merge wonkiness.
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/989dd44c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/989dd44c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/989dd44c
Branch: refs/heads/trunk
Commit: 989dd44c078bcfd4a1fbe825b104e3b9bce1435c
Parents: 94e37b4
Author: Brandon Williams <br...@apache.org>
Authored: Wed Dec 5 21:26:15 2012 -0600
Committer: Brandon Williams <br...@apache.org>
Committed: Wed Dec 5 21:26:15 2012 -0600
----------------------------------------------------------------------
src/java/org/apache/cassandra/config/Config.java | 1 -
.../apache/cassandra/thrift/CassandraDaemon.java | 223 ---------------
2 files changed, 0 insertions(+), 224 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/989dd44c/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 609633c..4dd1dff 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -106,7 +106,6 @@ public class Config
public Integer in_memory_compaction_limit_in_mb = 64;
public Integer concurrent_compactors = FBUtilities.getAvailableProcessors();
public volatile Integer compaction_throughput_mb_per_sec = 16;
- public Integer compaction_throughput_mb_per_sec = 16;
public Boolean multithreaded_compaction = false;
public Integer max_streaming_retries = 3;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/989dd44c/src/java/org/apache/cassandra/thrift/CassandraDaemon.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CassandraDaemon.java b/src/java/org/apache/cassandra/thrift/CassandraDaemon.java
deleted file mode 100644
index 572e3e0..0000000
--- a/src/java/org/apache/cassandra/thrift/CassandraDaemon.java
+++ /dev/null
@@ -1,223 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.thrift;
-
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.cassandra.service.AbstractCassandraDaemon;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.thrift.server.TNonblockingServer;
-import org.apache.thrift.server.TThreadPoolServer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
-import org.apache.cassandra.concurrent.NamedThreadFactory;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.thrift.protocol.TProtocolFactory;
-import org.apache.thrift.server.TServer;
-import org.apache.thrift.transport.TFramedTransport;
-import org.apache.thrift.transport.TNonblockingServerTransport;
-import org.apache.thrift.transport.TServerTransport;
-import org.apache.thrift.transport.TTransportException;
-import org.apache.thrift.transport.TTransportFactory;
-
-/**
- * This class supports two methods for creating a Cassandra node daemon,
- * invoking the class's main method, and using the jsvc wrapper from
- * commons-daemon, (for more information on using this class with the
- * jsvc wrapper, see the
- * <a href="http://commons.apache.org/daemon/jsvc.html">Commons Daemon</a>
- * documentation).
- */
-
-public class CassandraDaemon extends org.apache.cassandra.service.AbstractCassandraDaemon
-{
- protected static CassandraDaemon instance;
-
- static
- {
- AbstractCassandraDaemon.initLog4j();
- }
-
- private static Logger logger = LoggerFactory.getLogger(CassandraDaemon.class);
- private final static String SYNC = "sync";
- private final static String ASYNC = "async";
- private final static String HSHA = "hsha";
- public final static List<String> rpc_server_types = Arrays.asList(SYNC, ASYNC, HSHA);
- private ThriftServer server;
-
- protected void startServer()
- {
- if (server == null)
- {
- server = new ThriftServer(listenAddr, listenPort);
- server.start();
- }
- }
-
- protected void stopServer()
- {
- if (server != null)
- {
- server.stopServer();
- try
- {
- server.join();
- }
- catch (InterruptedException e)
- {
- logger.error("Interrupted while waiting thrift server to stop", e);
- }
- server = null;
- }
- }
-
- public static void stop(String[] args)
- {
- instance.stopServer();
- instance.deactivate();
- }
-
- public static void main(String[] args)
- {
- instance = new CassandraDaemon();
- instance.activate();
- }
-
- /**
- * Simple class to run the thrift connection accepting code in separate
- * thread of control.
- */
- private static class ThriftServer extends Thread
- {
- private TServer serverEngine;
-
- public ThriftServer(InetAddress listenAddr, int listenPort)
- {
- // now we start listening for clients
- final CassandraServer cassandraServer = new CassandraServer();
- Cassandra.Processor processor = new Cassandra.Processor(cassandraServer);
-
- // Transport
- logger.info(String.format("Binding thrift service to %s:%s", listenAddr, listenPort));
-
- // Protocol factory
- TProtocolFactory tProtocolFactory = new TBinaryProtocol.Factory(true, true, DatabaseDescriptor.getThriftMaxMessageLength());
-
- // Transport factory
- int tFramedTransportSize = DatabaseDescriptor.getThriftFramedTransportSize();
- TTransportFactory inTransportFactory = new TFramedTransport.Factory(tFramedTransportSize);
- TTransportFactory outTransportFactory = new TFramedTransport.Factory(tFramedTransportSize);
- logger.info("Using TFastFramedTransport with a max frame size of {} bytes.", tFramedTransportSize);
-
- if (DatabaseDescriptor.getRpcServerType().equalsIgnoreCase(SYNC))
- {
- TServerTransport serverTransport;
- try
- {
- serverTransport = new TCustomServerSocket(new InetSocketAddress(listenAddr, listenPort),
- DatabaseDescriptor.getRpcKeepAlive(),
- DatabaseDescriptor.getRpcSendBufferSize(),
- DatabaseDescriptor.getRpcRecvBufferSize());
- }
- catch (TTransportException e)
- {
- throw new RuntimeException(String.format("Unable to create thrift socket to %s:%s", listenAddr, listenPort), e);
- }
- // ThreadPool Server and will be invocation per connection basis...
- TThreadPoolServer.Args serverArgs = new TThreadPoolServer.Args(serverTransport)
- .minWorkerThreads(DatabaseDescriptor.getRpcMinThreads())
- .maxWorkerThreads(DatabaseDescriptor.getRpcMaxThreads())
- .inputTransportFactory(inTransportFactory)
- .outputTransportFactory(outTransportFactory)
- .inputProtocolFactory(tProtocolFactory)
- .outputProtocolFactory(tProtocolFactory)
- .processor(processor);
- ExecutorService executorService = new CleaningThreadPool(cassandraServer.clientState, serverArgs.minWorkerThreads, serverArgs.maxWorkerThreads);
- serverEngine = new CustomTThreadPoolServer(serverArgs, executorService);
- logger.info(String.format("Using synchronous/threadpool thrift server on %s : %s", listenAddr, listenPort));
- }
- else
- {
- TNonblockingServerTransport serverTransport;
- try
- {
- serverTransport = new TCustomNonblockingServerSocket(new InetSocketAddress(listenAddr, listenPort),
- DatabaseDescriptor.getRpcKeepAlive(),
- DatabaseDescriptor.getRpcSendBufferSize(),
- DatabaseDescriptor.getRpcRecvBufferSize());
- }
- catch (TTransportException e)
- {
- throw new RuntimeException(String.format("Unable to create thrift socket to %s:%s", listenAddr, listenPort), e);
- }
-
- if (DatabaseDescriptor.getRpcServerType().equalsIgnoreCase(ASYNC))
- {
- // This is single threaded hence the invocation will be all
- // in one thread.
- TNonblockingServer.Args serverArgs = new TNonblockingServer.Args(serverTransport).inputTransportFactory(inTransportFactory)
- .outputTransportFactory(outTransportFactory)
- .inputProtocolFactory(tProtocolFactory)
- .outputProtocolFactory(tProtocolFactory)
- .processor(processor);
- logger.info(String.format("Using non-blocking/asynchronous thrift server on %s : %s", listenAddr, listenPort));
- serverEngine = new CustomTNonBlockingServer(serverArgs);
- }
- else if (DatabaseDescriptor.getRpcServerType().equalsIgnoreCase(HSHA))
- {
- // This is NIO selector service but the invocation will be Multi-Threaded with the Executor service.
- ExecutorService executorService = new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getRpcMinThreads(),
- DatabaseDescriptor.getRpcMaxThreads(),
- 60L,
- TimeUnit.SECONDS,
- new SynchronousQueue<Runnable>(),
- new NamedThreadFactory("RPC-Thread"), "RPC-THREAD-POOL");
- TNonblockingServer.Args serverArgs = new TNonblockingServer.Args(serverTransport).inputTransportFactory(inTransportFactory)
- .outputTransportFactory(outTransportFactory)
- .inputProtocolFactory(tProtocolFactory)
- .outputProtocolFactory(tProtocolFactory)
- .processor(processor);
- logger.info(String.format("Using custom half-sync/half-async thrift server on %s : %s", listenAddr, listenPort));
- // Check for available processors in the system which will be equal to the IO Threads.
- serverEngine = new CustomTHsHaServer(serverArgs, executorService, FBUtilities.getAvailableProcessors());
- }
- }
- }
-
- public void run()
- {
- logger.info("Listening for thrift clients...");
- serverEngine.serve();
- }
-
- public void stopServer()
- {
- logger.info("Stop listening to thrift clients");
- serverEngine.stop();
- }
- }
-}