You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2014/05/19 23:20:41 UTC
git commit: Only serve thrift over HTTP.
Repository: incubator-aurora
Updated Branches:
refs/heads/master b9ab5ff99 -> 10fce64b3
Only serve thrift over HTTP.
Bugs closed: AURORA-342
Reviewed at https://reviews.apache.org/r/21294/
Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/10fce64b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/10fce64b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/10fce64b
Branch: refs/heads/master
Commit: 10fce64b35a486dfc229dd7ba989f6dc73680032
Parents: b9ab5ff
Author: Bill Farner <wf...@apache.org>
Authored: Mon May 19 14:04:55 2014 -0700
Committer: Bill Farner <wf...@apache.org>
Committed: Mon May 19 14:04:55 2014 -0700
----------------------------------------------------------------------
.../aurora/scheduler/app/SchedulerMain.java | 9 +-
.../aurora/scheduler/thrift/ThriftModule.java | 5 -
.../aurora/scheduler/thrift/ThriftServer.java | 107 -----------------
.../scheduler/thrift/ThriftServerLauncher.java | 116 -------------------
.../aurora/client/api/scheduler_client.py | 37 +++---
.../aurora/scheduler/app/SchedulerIT.java | 13 +--
.../aurora/client/api/test_scheduler_client.py | 2 +-
.../aurora/scheduler/app/AuroraTestKeyStore | Bin 1267 -> 0 bytes
8 files changed, 24 insertions(+), 265 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/10fce64b/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java b/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java
index da6f5e5..08ef02c 100644
--- a/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java
+++ b/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java
@@ -231,14 +231,15 @@ public class SchedulerMain extends AbstractApplication {
LeadershipListener leaderListener = schedulerLifecycle.prepare();
- Optional<InetSocketAddress> primarySocket = serviceRegistry.getPrimarySocket();
- if (!primarySocket.isPresent()) {
- throw new IllegalStateException("No primary service registered with LocalServiceRegistry.");
+ Optional<InetSocketAddress> httpSocket =
+ Optional.fromNullable(serviceRegistry.getAuxiliarySockets().get("http"));
+ if (!httpSocket.isPresent()) {
+ throw new IllegalStateException("No HTTP service registered with LocalServiceRegistry.");
}
try {
schedulerService.lead(
- primarySocket.get(),
+ httpSocket.get(),
serviceRegistry.getAuxiliarySockets(),
leaderListener);
} catch (Group.WatchException e) {
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/10fce64b/src/main/java/org/apache/aurora/scheduler/thrift/ThriftModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/ThriftModule.java b/src/main/java/org/apache/aurora/scheduler/thrift/ThriftModule.java
index fc5610e..38e0c7d 100644
--- a/src/main/java/org/apache/aurora/scheduler/thrift/ThriftModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/thrift/ThriftModule.java
@@ -15,11 +15,8 @@
*/
package org.apache.aurora.scheduler.thrift;
-import javax.inject.Singleton;
-
import com.google.inject.AbstractModule;
import com.twitter.common.application.http.Registration;
-import com.twitter.common.application.modules.LifecycleModule;
import org.apache.aurora.gen.AuroraAdmin;
import org.apache.aurora.scheduler.thrift.aop.AopModule;
@@ -33,8 +30,6 @@ public class ThriftModule extends AbstractModule {
@Override
protected void configure() {
bind(AuroraAdmin.Iface.class).to(SchedulerThriftInterface.class);
- bind(ThriftServer.class).in(Singleton.class);
- LifecycleModule.bindServiceRunner(binder(), ThriftServerLauncher.class);
Registration.registerServlet(binder(), "/api", SchedulerAPIServlet.class, true);
// NOTE: GzipFilter is applied only to /api instead of globally because the Jersey-managed
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/10fce64b/src/main/java/org/apache/aurora/scheduler/thrift/ThriftServer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/ThriftServer.java b/src/main/java/org/apache/aurora/scheduler/thrift/ThriftServer.java
deleted file mode 100644
index 5b55e94..0000000
--- a/src/main/java/org/apache/aurora/scheduler/thrift/ThriftServer.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/**
- * Copyright 2013 Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.thrift;
-
-import java.net.ServerSocket;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.twitter.thrift.Status;
-
-import org.apache.thrift.TProcessor;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.server.TServer;
-import org.apache.thrift.server.TThreadPoolServer;
-import org.apache.thrift.transport.TServerSocket;
-
-class ThriftServer {
- private static final Logger LOG = Logger.getLogger(ThriftServer.class.getName());
-
- private TServer server = null;
-
- // Current health status of the server.
- private Status status = Status.STARTING;
-
- /**
- * Starts the server.
- * This may be called at any point except when the server is already alive. That is, it's
- * allowable to start, stop, and re-start the server.
- *
- * @param socket The socket to use.
- * @param processor The processor to handle requests.
- */
- public synchronized void start(ServerSocket socket, TProcessor processor) {
- Preconditions.checkNotNull(socket);
- Preconditions.checkNotNull(processor);
- Preconditions.checkState(status != Status.ALIVE, "Server must only be started once.");
- setStatus(Status.ALIVE);
- TThreadPoolServer.Args args = new TThreadPoolServer.Args(new TServerSocket(socket))
- .processor(processor)
- .protocolFactory(new TBinaryProtocol.Factory(false, true));
-
- final TServer starting = new TThreadPoolServer(args);
- server = starting;
- LOG.info("Starting thrift server on port " + socket.getLocalPort());
-
- Thread listeningThread = new ThreadFactoryBuilder().setDaemon(false).build()
- .newThread(new Runnable() {
- @Override
- public void run() {
- try {
- starting.serve();
- } catch (Throwable t) {
- LOG.log(Level.WARNING,
- "Uncaught exception while attempting to handle service requests: " + t, t);
- setStatus(Status.DEAD);
- }
- }
- });
-
- listeningThread.start();
- }
-
- private synchronized void setStatus(Status status) {
- LOG.info("Moving from status " + this.status + " to " + status);
- this.status = status;
- }
-
- /**
- * Attempts to shut down the server.
- * The server may be shut down at any time, though the request will be ignored if the server is
- * already stopped.
- */
- public synchronized void shutdown() {
- if (status == Status.STOPPED) {
- LOG.info("Server already stopped, shutdown request ignored.");
- return;
- }
-
- LOG.info("Received shutdown request, stopping server.");
- setStatus(Status.STOPPING);
-
- // TODO(William Farner): Figure out what happens to queued / in-process requests when the server
- // is stopped. Might want to allow a sleep period for the active requests to be completed.
-
- if (server != null) {
- server.stop();
- }
-
- server = null;
- setStatus(Status.STOPPED);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/10fce64b/src/main/java/org/apache/aurora/scheduler/thrift/ThriftServerLauncher.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/ThriftServerLauncher.java b/src/main/java/org/apache/aurora/scheduler/thrift/ThriftServerLauncher.java
deleted file mode 100644
index bb93be3..0000000
--- a/src/main/java/org/apache/aurora/scheduler/thrift/ThriftServerLauncher.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/**
- * Copyright 2013 Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.thrift;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.ServerSocket;
-import java.security.GeneralSecurityException;
-import java.security.KeyStore;
-import java.util.logging.Logger;
-
-import javax.inject.Inject;
-import javax.net.ssl.KeyManagerFactory;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLServerSocket;
-import javax.net.ssl.SSLServerSocketFactory;
-
-import com.google.common.base.Optional;
-import com.twitter.common.application.modules.LifecycleModule.ServiceRunner;
-import com.twitter.common.application.modules.LocalServiceRegistry.LocalService;
-import com.twitter.common.base.Command;
-
-import org.apache.aurora.gen.AuroraAdmin;
-import org.apache.aurora.gen.AuroraAdmin.Iface;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * Service launcher that starts up and registers the scheduler thrift server as a primary service
- * for the application.
- */
-class ThriftServerLauncher implements ServiceRunner {
-
- private static final Logger LOG = Logger.getLogger(ThriftServerLauncher.class.getName());
-
- private final ThriftConfiguration configuration;
-
- // Security is enforced via file permissions, not via this password, for what it's worth.
- private static final String SSL_KEYFILE_PASSWORD = "MesosKeyStorePassword";
-
- private final Iface schedulerThriftInterface;
- private final ThriftServer schedulerThriftServer;
-
- @Inject
- ThriftServerLauncher(
- Iface schedulerThriftInterface,
- ThriftServer schedulerThriftServer,
- ThriftConfiguration configuration) {
-
- this.schedulerThriftInterface = checkNotNull(schedulerThriftInterface);
- this.schedulerThriftServer = checkNotNull(schedulerThriftServer);
- this.configuration = checkNotNull(configuration);
- }
-
- @Override
- public LocalService launch() {
- ServerSocket socket = getServerSocket();
- schedulerThriftServer.start(
- socket,
- new AuroraAdmin.Processor<>(schedulerThriftInterface));
-
- Command shutdown = new Command() {
- @Override
- public void execute() {
- LOG.info("Stopping thrift server.");
- schedulerThriftServer.shutdown();
- }
- };
-
- return LocalService.primaryService(socket.getLocalPort(), shutdown);
- }
-
- private ServerSocket getServerSocket() {
- try {
- Optional<? extends InputStream> sslKeyStream = configuration.getSslKeyStream();
- if (!sslKeyStream.isPresent()) {
- LOG.warning("Running Thrift Server without SSL.");
- return new ServerSocket(configuration.getServingPort());
- } else {
- // TODO(Kevin Sweeney): Add helper to perform this keyfile import.
- KeyStore ks = KeyStore.getInstance("JKS");
- ks.load(sslKeyStream.get(), SSL_KEYFILE_PASSWORD.toCharArray());
-
- KeyManagerFactory kmf = KeyManagerFactory.getInstance("SunX509");
- kmf.init(ks, SSL_KEYFILE_PASSWORD.toCharArray());
-
- SSLContext ctx = SSLContext.getInstance("TLS");
- ctx.init(kmf.getKeyManagers(), null, null);
-
- SSLServerSocketFactory ssf = ctx.getServerSocketFactory();
- SSLServerSocket serverSocket = (SSLServerSocket) ssf.createServerSocket(
- configuration.getServingPort());
- serverSocket.setEnabledCipherSuites(serverSocket.getSupportedCipherSuites());
- serverSocket.setNeedClientAuth(false);
- return serverSocket;
- }
- } catch (IOException e) {
- throw new RuntimeException("Failed to read key file.", e);
- } catch (GeneralSecurityException e) {
- throw new RuntimeException("SSL setup failed.", e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/10fce64b/src/main/python/apache/aurora/client/api/scheduler_client.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/api/scheduler_client.py b/src/main/python/apache/aurora/client/api/scheduler_client.py
index ff5d2bc..75053b7 100644
--- a/src/main/python/apache/aurora/client/api/scheduler_client.py
+++ b/src/main/python/apache/aurora/client/api/scheduler_client.py
@@ -20,11 +20,10 @@ import time
import traceback
from pystachio import Boolean, Default, Integer, String
-from thrift.protocol import TBinaryProtocol
-from thrift.transport import TSocket, TTransport
+from thrift.protocol import TJSONProtocol
+from thrift.transport import THttpClient, TTransport
from twitter.common import log
from twitter.common.quantity import Amount, Time
-from twitter.common.rpc.transports.tsslsocket import DelayedHandshakeTSSLSocket
from twitter.common.zookeeper.kazoo_client import TwitterKazooClient
from twitter.common.zookeeper.serverset import ServerSet
@@ -42,7 +41,6 @@ class SchedulerClientTrait(Cluster.Trait):
scheduler_uri = String
proxy_url = String
auth_mechanism = Default(String, 'UNAUTHENTICATED')
- use_thrift_ssl = Default(Boolean, False)
class SchedulerClient(object):
@@ -61,21 +59,20 @@ class SchedulerClient(object):
cluster = cluster.with_trait(SchedulerClientTrait)
if cluster.zk:
return ZookeeperSchedulerClient(
- cluster, port=cluster.zk_port, ssl=cluster.use_thrift_ssl, **kwargs)
+ cluster, port=cluster.zk_port, **kwargs)
elif cluster.scheduler_uri:
try:
host, port = cluster.scheduler_uri.split(':', 2)
port = int(port)
except ValueError:
raise ValueError('Malformed Cluster scheduler_uri: %s' % cluster.scheduler_uri)
- return DirectSchedulerClient(host, port, ssl=cluster.use_thrift_ssl)
+ return DirectSchedulerClient(host, port)
else:
raise ValueError('"cluster" does not specify zk or scheduler_uri')
- def __init__(self, verbose=False, ssl=False):
+ def __init__(self, verbose=False):
self._client = None
self._verbose = verbose
- self._ssl = ssl
def get_thrift_client(self):
if self._client is None:
@@ -88,13 +85,9 @@ class SchedulerClient(object):
return None
@staticmethod
- def _connect_scheduler(host, port, with_ssl=False):
- if with_ssl:
- socket = DelayedHandshakeTSSLSocket(host, port, delay_handshake=True, validate=False)
- else:
- socket = TSocket.TSocket(host, port)
- transport = TTransport.TBufferedTransport(socket)
- protocol = TBinaryProtocol.TBinaryProtocol(transport)
+ def _connect_scheduler(host, port):
+ transport = THttpClient('http://%s:%s/api' % (host, port))
+ protocol = TJSONProtocol.TJSONProtocol(transport)
schedulerClient = AuroraAdmin.Client(protocol)
for _ in range(SchedulerClient.THRIFT_RETRIES):
try:
@@ -123,11 +116,10 @@ class ZookeeperSchedulerClient(SchedulerClient):
zk = TwitterKazooClient.make(str('%s:%s' % (cluster.zk, port)), verbose=verbose)
return zk, ServerSet(zk, cluster.scheduler_zk_path, **kw)
- def __init__(self, cluster, port=2181, ssl=False, verbose=False):
- SchedulerClient.__init__(self, verbose=verbose, ssl=ssl)
+ def __init__(self, cluster, port=2181, verbose=False):
+ SchedulerClient.__init__(self, verbose=verbose)
self._cluster = cluster
self._zkport = port
- self._endpoint = None
self._http = None
def _connect(self):
@@ -141,10 +133,9 @@ class ZookeeperSchedulerClient(SchedulerClient):
if len(serverset_endpoints) == 0:
raise self.CouldNotConnect('No schedulers detected in %s!' % self._cluster.name)
instance = serverset_endpoints[0]
- self._endpoint = instance.service_endpoint
self._http = instance.additional_endpoints.get('http')
zk.stop()
- return self._connect_scheduler(self._endpoint.host, self._endpoint.port, self._ssl)
+ return self._connect_scheduler(self._http.host, self._http.port)
@property
def url(self):
@@ -158,13 +149,13 @@ class ZookeeperSchedulerClient(SchedulerClient):
class DirectSchedulerClient(SchedulerClient):
- def __init__(self, host, port, ssl=False):
- SchedulerClient.__init__(self, verbose=True, ssl=ssl)
+ def __init__(self, host, port):
+ SchedulerClient.__init__(self, verbose=True)
self._host = host
self._port = port
def _connect(self):
- return self._connect_scheduler(self._host, self._port, with_ssl=self._ssl)
+ return self._connect_scheduler(self._host, self._port)
@property
def url(self):
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/10fce64b/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java b/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
index df5a5da..e75a8e1 100644
--- a/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
+++ b/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
@@ -36,7 +36,6 @@ import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
-import com.google.common.net.HostAndPort;
import com.google.common.testing.TearDown;
import com.google.common.util.concurrent.Atomics;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -62,7 +61,6 @@ import com.twitter.common.zookeeper.ZooKeeperClient;
import com.twitter.common.zookeeper.guice.client.ZooKeeperClientModule;
import com.twitter.common.zookeeper.guice.client.ZooKeeperClientModule.ClientConfig;
import com.twitter.common.zookeeper.testing.BaseZooKeeperTest;
-import com.twitter.thrift.Endpoint;
import com.twitter.thrift.ServiceInstance;
import org.apache.aurora.codec.ThriftBinaryCodec.CodingException;
@@ -237,19 +235,16 @@ public class SchedulerIT extends BaseZooKeeperTest {
});
}
- private HostAndPort awaitSchedulerReady() throws Exception {
- return executor.submit(new Callable<HostAndPort>() {
+ private void awaitSchedulerReady() throws Exception {
+ executor.submit(new Callable<Void>() {
@Override
- public HostAndPort call() throws Exception {
- final AtomicReference<HostAndPort> thriftEndpoint = Atomics.newReference();
+ public Void call() throws Exception {
ServerSet schedulerService = new ServerSetImpl(zkClient, SERVERSET_PATH);
final CountDownLatch schedulerReady = new CountDownLatch(1);
schedulerService.watch(new HostChangeMonitor<ServiceInstance>() {
@Override
public void onChange(ImmutableSet<ServiceInstance> hostSet) {
if (!hostSet.isEmpty()) {
- Endpoint endpoint = Iterables.getOnlyElement(hostSet).getServiceEndpoint();
- thriftEndpoint.set(HostAndPort.fromParts(endpoint.getHost(), endpoint.getPort()));
schedulerReady.countDown();
}
}
@@ -257,7 +252,7 @@ public class SchedulerIT extends BaseZooKeeperTest {
// A timeout is used because certain types of assertion errors (mocks) will not surface
// until the main test thread exits this body of code.
assertTrue(schedulerReady.await(5L, TimeUnit.MINUTES));
- return thriftEndpoint.get();
+ return null;
}
}).get();
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/10fce64b/src/test/python/apache/aurora/client/api/test_scheduler_client.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/api/test_scheduler_client.py b/src/test/python/apache/aurora/client/api/test_scheduler_client.py
index 399d4c6..7bf2c32 100644
--- a/src/test/python/apache/aurora/client/api/test_scheduler_client.py
+++ b/src/test/python/apache/aurora/client/api/test_scheduler_client.py
@@ -322,7 +322,7 @@ class TestZookeeperSchedulerClient(unittest.TestCase):
zk_scheduler_client = TestZookeeperSchedulerClient(Cluster(proxy_url=None))
self.mox.StubOutWithMock(zk_scheduler_client, '_connect_scheduler')
mock_zk.stop()
- zk_scheduler_client._connect_scheduler(host, port, False)
+ zk_scheduler_client._connect_scheduler(host, port)
self.mox.ReplayAll()
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/10fce64b/src/test/resources/org/apache/aurora/scheduler/app/AuroraTestKeyStore
----------------------------------------------------------------------
diff --git a/src/test/resources/org/apache/aurora/scheduler/app/AuroraTestKeyStore b/src/test/resources/org/apache/aurora/scheduler/app/AuroraTestKeyStore
deleted file mode 100644
index 1157471..0000000
Binary files a/src/test/resources/org/apache/aurora/scheduler/app/AuroraTestKeyStore and /dev/null differ