You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2014/05/19 23:20:41 UTC

git commit: Only serve thrift over HTTP.

Repository: incubator-aurora
Updated Branches:
  refs/heads/master b9ab5ff99 -> 10fce64b3


Only serve thrift over HTTP.

Bugs closed: AURORA-342

Reviewed at https://reviews.apache.org/r/21294/


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/10fce64b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/10fce64b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/10fce64b

Branch: refs/heads/master
Commit: 10fce64b35a486dfc229dd7ba989f6dc73680032
Parents: b9ab5ff
Author: Bill Farner <wf...@apache.org>
Authored: Mon May 19 14:04:55 2014 -0700
Committer: Bill Farner <wf...@apache.org>
Committed: Mon May 19 14:04:55 2014 -0700

----------------------------------------------------------------------
 .../aurora/scheduler/app/SchedulerMain.java     |   9 +-
 .../aurora/scheduler/thrift/ThriftModule.java   |   5 -
 .../aurora/scheduler/thrift/ThriftServer.java   | 107 -----------------
 .../scheduler/thrift/ThriftServerLauncher.java  | 116 -------------------
 .../aurora/client/api/scheduler_client.py       |  37 +++---
 .../aurora/scheduler/app/SchedulerIT.java       |  13 +--
 .../aurora/client/api/test_scheduler_client.py  |   2 +-
 .../aurora/scheduler/app/AuroraTestKeyStore     | Bin 1267 -> 0 bytes
 8 files changed, 24 insertions(+), 265 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/10fce64b/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java b/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java
index da6f5e5..08ef02c 100644
--- a/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java
+++ b/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java
@@ -231,14 +231,15 @@ public class SchedulerMain extends AbstractApplication {
 
     LeadershipListener leaderListener = schedulerLifecycle.prepare();
 
-    Optional<InetSocketAddress> primarySocket = serviceRegistry.getPrimarySocket();
-    if (!primarySocket.isPresent()) {
-      throw new IllegalStateException("No primary service registered with LocalServiceRegistry.");
+    Optional<InetSocketAddress> httpSocket =
+        Optional.fromNullable(serviceRegistry.getAuxiliarySockets().get("http"));
+    if (!httpSocket.isPresent()) {
+      throw new IllegalStateException("No HTTP service registered with LocalServiceRegistry.");
     }
 
     try {
       schedulerService.lead(
-          primarySocket.get(),
+          httpSocket.get(),
           serviceRegistry.getAuxiliarySockets(),
           leaderListener);
     } catch (Group.WatchException e) {

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/10fce64b/src/main/java/org/apache/aurora/scheduler/thrift/ThriftModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/ThriftModule.java b/src/main/java/org/apache/aurora/scheduler/thrift/ThriftModule.java
index fc5610e..38e0c7d 100644
--- a/src/main/java/org/apache/aurora/scheduler/thrift/ThriftModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/thrift/ThriftModule.java
@@ -15,11 +15,8 @@
  */
 package org.apache.aurora.scheduler.thrift;
 
-import javax.inject.Singleton;
-
 import com.google.inject.AbstractModule;
 import com.twitter.common.application.http.Registration;
-import com.twitter.common.application.modules.LifecycleModule;
 
 import org.apache.aurora.gen.AuroraAdmin;
 import org.apache.aurora.scheduler.thrift.aop.AopModule;
@@ -33,8 +30,6 @@ public class ThriftModule extends AbstractModule {
   @Override
   protected void configure() {
     bind(AuroraAdmin.Iface.class).to(SchedulerThriftInterface.class);
-    bind(ThriftServer.class).in(Singleton.class);
-    LifecycleModule.bindServiceRunner(binder(), ThriftServerLauncher.class);
 
     Registration.registerServlet(binder(), "/api", SchedulerAPIServlet.class, true);
     // NOTE: GzipFilter is applied only to /api instead of globally because the Jersey-managed

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/10fce64b/src/main/java/org/apache/aurora/scheduler/thrift/ThriftServer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/ThriftServer.java b/src/main/java/org/apache/aurora/scheduler/thrift/ThriftServer.java
deleted file mode 100644
index 5b55e94..0000000
--- a/src/main/java/org/apache/aurora/scheduler/thrift/ThriftServer.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/**
- * Copyright 2013 Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.thrift;
-
-import java.net.ServerSocket;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.twitter.thrift.Status;
-
-import org.apache.thrift.TProcessor;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.server.TServer;
-import org.apache.thrift.server.TThreadPoolServer;
-import org.apache.thrift.transport.TServerSocket;
-
-class ThriftServer {
-  private static final Logger LOG = Logger.getLogger(ThriftServer.class.getName());
-
-  private TServer server = null;
-
-  // Current health status of the server.
-  private Status status = Status.STARTING;
-
-  /**
-   * Starts the server.
-   * This may be called at any point except when the server is already alive.  That is, it's
-   * allowable to start, stop, and re-start the server.
-   *
-   * @param socket The socket to use.
-   * @param processor The processor to handle requests.
-   */
-  public synchronized void start(ServerSocket socket, TProcessor processor) {
-    Preconditions.checkNotNull(socket);
-    Preconditions.checkNotNull(processor);
-    Preconditions.checkState(status != Status.ALIVE, "Server must only be started once.");
-    setStatus(Status.ALIVE);
-    TThreadPoolServer.Args args = new TThreadPoolServer.Args(new TServerSocket(socket))
-        .processor(processor)
-        .protocolFactory(new TBinaryProtocol.Factory(false, true));
-
-    final TServer starting = new TThreadPoolServer(args);
-    server = starting;
-    LOG.info("Starting thrift server on port " + socket.getLocalPort());
-
-    Thread listeningThread = new ThreadFactoryBuilder().setDaemon(false).build()
-        .newThread(new Runnable() {
-          @Override
-          public void run() {
-            try {
-              starting.serve();
-            } catch (Throwable t) {
-              LOG.log(Level.WARNING,
-                  "Uncaught exception while attempting to handle service requests: " + t, t);
-              setStatus(Status.DEAD);
-            }
-          }
-    });
-
-    listeningThread.start();
-  }
-
-  private synchronized void setStatus(Status status) {
-    LOG.info("Moving from status " + this.status + " to " + status);
-    this.status = status;
-  }
-
-  /**
-   * Attempts to shut down the server.
-   * The server may be shut down at any time, though the request will be ignored if the server is
-   * already stopped.
-   */
-  public synchronized void shutdown() {
-    if (status == Status.STOPPED) {
-      LOG.info("Server already stopped, shutdown request ignored.");
-      return;
-    }
-
-    LOG.info("Received shutdown request, stopping server.");
-    setStatus(Status.STOPPING);
-
-    // TODO(William Farner): Figure out what happens to queued / in-process requests when the server
-    // is stopped.  Might want to allow a sleep period for the active requests to be completed.
-
-    if (server != null) {
-      server.stop();
-    }
-
-    server = null;
-    setStatus(Status.STOPPED);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/10fce64b/src/main/java/org/apache/aurora/scheduler/thrift/ThriftServerLauncher.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/ThriftServerLauncher.java b/src/main/java/org/apache/aurora/scheduler/thrift/ThriftServerLauncher.java
deleted file mode 100644
index bb93be3..0000000
--- a/src/main/java/org/apache/aurora/scheduler/thrift/ThriftServerLauncher.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/**
- * Copyright 2013 Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.thrift;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.ServerSocket;
-import java.security.GeneralSecurityException;
-import java.security.KeyStore;
-import java.util.logging.Logger;
-
-import javax.inject.Inject;
-import javax.net.ssl.KeyManagerFactory;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLServerSocket;
-import javax.net.ssl.SSLServerSocketFactory;
-
-import com.google.common.base.Optional;
-import com.twitter.common.application.modules.LifecycleModule.ServiceRunner;
-import com.twitter.common.application.modules.LocalServiceRegistry.LocalService;
-import com.twitter.common.base.Command;
-
-import org.apache.aurora.gen.AuroraAdmin;
-import org.apache.aurora.gen.AuroraAdmin.Iface;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * Service launcher that starts up and registers the scheduler thrift server as a primary service
- * for the application.
- */
-class ThriftServerLauncher implements ServiceRunner {
-
-  private static final Logger LOG = Logger.getLogger(ThriftServerLauncher.class.getName());
-
-  private final ThriftConfiguration configuration;
-
-  // Security is enforced via file permissions, not via this password, for what it's worth.
-  private static final String SSL_KEYFILE_PASSWORD = "MesosKeyStorePassword";
-
-  private final Iface schedulerThriftInterface;
-  private final ThriftServer schedulerThriftServer;
-
-  @Inject
-  ThriftServerLauncher(
-      Iface schedulerThriftInterface,
-      ThriftServer schedulerThriftServer,
-      ThriftConfiguration configuration) {
-
-    this.schedulerThriftInterface = checkNotNull(schedulerThriftInterface);
-    this.schedulerThriftServer = checkNotNull(schedulerThriftServer);
-    this.configuration = checkNotNull(configuration);
-  }
-
-  @Override
-  public LocalService launch() {
-    ServerSocket socket = getServerSocket();
-    schedulerThriftServer.start(
-        socket,
-        new AuroraAdmin.Processor<>(schedulerThriftInterface));
-
-    Command shutdown = new Command() {
-      @Override
-      public void execute() {
-        LOG.info("Stopping thrift server.");
-        schedulerThriftServer.shutdown();
-      }
-    };
-
-    return LocalService.primaryService(socket.getLocalPort(), shutdown);
-  }
-
-  private ServerSocket getServerSocket() {
-    try {
-      Optional<? extends InputStream> sslKeyStream = configuration.getSslKeyStream();
-      if (!sslKeyStream.isPresent()) {
-        LOG.warning("Running Thrift Server without SSL.");
-        return new ServerSocket(configuration.getServingPort());
-      } else {
-        // TODO(Kevin Sweeney): Add helper to perform this keyfile import.
-        KeyStore ks = KeyStore.getInstance("JKS");
-        ks.load(sslKeyStream.get(), SSL_KEYFILE_PASSWORD.toCharArray());
-
-        KeyManagerFactory kmf = KeyManagerFactory.getInstance("SunX509");
-        kmf.init(ks, SSL_KEYFILE_PASSWORD.toCharArray());
-
-        SSLContext ctx = SSLContext.getInstance("TLS");
-        ctx.init(kmf.getKeyManagers(), null, null);
-
-        SSLServerSocketFactory ssf = ctx.getServerSocketFactory();
-        SSLServerSocket serverSocket = (SSLServerSocket) ssf.createServerSocket(
-            configuration.getServingPort());
-        serverSocket.setEnabledCipherSuites(serverSocket.getSupportedCipherSuites());
-        serverSocket.setNeedClientAuth(false);
-        return serverSocket;
-      }
-    } catch (IOException e) {
-      throw new RuntimeException("Failed to read key file.", e);
-    } catch (GeneralSecurityException e) {
-      throw new RuntimeException("SSL setup failed.", e);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/10fce64b/src/main/python/apache/aurora/client/api/scheduler_client.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/api/scheduler_client.py b/src/main/python/apache/aurora/client/api/scheduler_client.py
index ff5d2bc..75053b7 100644
--- a/src/main/python/apache/aurora/client/api/scheduler_client.py
+++ b/src/main/python/apache/aurora/client/api/scheduler_client.py
@@ -20,11 +20,10 @@ import time
 import traceback
 
 from pystachio import Boolean, Default, Integer, String
-from thrift.protocol import TBinaryProtocol
-from thrift.transport import TSocket, TTransport
+from thrift.protocol import TJSONProtocol
+from thrift.transport import THttpClient, TTransport
 from twitter.common import log
 from twitter.common.quantity import Amount, Time
-from twitter.common.rpc.transports.tsslsocket import DelayedHandshakeTSSLSocket
 from twitter.common.zookeeper.kazoo_client import TwitterKazooClient
 from twitter.common.zookeeper.serverset import ServerSet
 
@@ -42,7 +41,6 @@ class SchedulerClientTrait(Cluster.Trait):
   scheduler_uri     = String
   proxy_url         = String
   auth_mechanism    = Default(String, 'UNAUTHENTICATED')
-  use_thrift_ssl    = Default(Boolean, False)
 
 
 class SchedulerClient(object):
@@ -61,21 +59,20 @@ class SchedulerClient(object):
     cluster = cluster.with_trait(SchedulerClientTrait)
     if cluster.zk:
       return ZookeeperSchedulerClient(
-          cluster, port=cluster.zk_port, ssl=cluster.use_thrift_ssl, **kwargs)
+          cluster, port=cluster.zk_port, **kwargs)
     elif cluster.scheduler_uri:
       try:
         host, port = cluster.scheduler_uri.split(':', 2)
         port = int(port)
       except ValueError:
         raise ValueError('Malformed Cluster scheduler_uri: %s' % cluster.scheduler_uri)
-      return DirectSchedulerClient(host, port, ssl=cluster.use_thrift_ssl)
+      return DirectSchedulerClient(host, port)
     else:
       raise ValueError('"cluster" does not specify zk or scheduler_uri')
 
-  def __init__(self, verbose=False, ssl=False):
+  def __init__(self, verbose=False):
     self._client = None
     self._verbose = verbose
-    self._ssl = ssl
 
   def get_thrift_client(self):
     if self._client is None:
@@ -88,13 +85,9 @@ class SchedulerClient(object):
     return None
 
   @staticmethod
-  def _connect_scheduler(host, port, with_ssl=False):
-    if with_ssl:
-      socket = DelayedHandshakeTSSLSocket(host, port, delay_handshake=True, validate=False)
-    else:
-      socket = TSocket.TSocket(host, port)
-    transport = TTransport.TBufferedTransport(socket)
-    protocol = TBinaryProtocol.TBinaryProtocol(transport)
+  def _connect_scheduler(host, port):
+    transport = THttpClient('http://%s:%s/api' % (host, port))
+    protocol = TJSONProtocol.TJSONProtocol(transport)
     schedulerClient = AuroraAdmin.Client(protocol)
     for _ in range(SchedulerClient.THRIFT_RETRIES):
       try:
@@ -123,11 +116,10 @@ class ZookeeperSchedulerClient(SchedulerClient):
     zk = TwitterKazooClient.make(str('%s:%s' % (cluster.zk, port)), verbose=verbose)
     return zk, ServerSet(zk, cluster.scheduler_zk_path, **kw)
 
-  def __init__(self, cluster, port=2181, ssl=False, verbose=False):
-    SchedulerClient.__init__(self, verbose=verbose, ssl=ssl)
+  def __init__(self, cluster, port=2181, verbose=False):
+    SchedulerClient.__init__(self, verbose=verbose)
     self._cluster = cluster
     self._zkport = port
-    self._endpoint = None
     self._http = None
 
   def _connect(self):
@@ -141,10 +133,9 @@ class ZookeeperSchedulerClient(SchedulerClient):
     if len(serverset_endpoints) == 0:
       raise self.CouldNotConnect('No schedulers detected in %s!' % self._cluster.name)
     instance = serverset_endpoints[0]
-    self._endpoint = instance.service_endpoint
     self._http = instance.additional_endpoints.get('http')
     zk.stop()
-    return self._connect_scheduler(self._endpoint.host, self._endpoint.port, self._ssl)
+    return self._connect_scheduler(self._http.host, self._http.port)
 
   @property
   def url(self):
@@ -158,13 +149,13 @@ class ZookeeperSchedulerClient(SchedulerClient):
 
 
 class DirectSchedulerClient(SchedulerClient):
-  def __init__(self, host, port, ssl=False):
-    SchedulerClient.__init__(self, verbose=True, ssl=ssl)
+  def __init__(self, host, port):
+    SchedulerClient.__init__(self, verbose=True)
     self._host = host
     self._port = port
 
   def _connect(self):
-    return self._connect_scheduler(self._host, self._port, with_ssl=self._ssl)
+    return self._connect_scheduler(self._host, self._port)
 
   @property
   def url(self):

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/10fce64b/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java b/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
index df5a5da..e75a8e1 100644
--- a/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
+++ b/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
@@ -36,7 +36,6 @@ import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
-import com.google.common.net.HostAndPort;
 import com.google.common.testing.TearDown;
 import com.google.common.util.concurrent.Atomics;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -62,7 +61,6 @@ import com.twitter.common.zookeeper.ZooKeeperClient;
 import com.twitter.common.zookeeper.guice.client.ZooKeeperClientModule;
 import com.twitter.common.zookeeper.guice.client.ZooKeeperClientModule.ClientConfig;
 import com.twitter.common.zookeeper.testing.BaseZooKeeperTest;
-import com.twitter.thrift.Endpoint;
 import com.twitter.thrift.ServiceInstance;
 
 import org.apache.aurora.codec.ThriftBinaryCodec.CodingException;
@@ -237,19 +235,16 @@ public class SchedulerIT extends BaseZooKeeperTest {
     });
   }
 
-  private HostAndPort awaitSchedulerReady() throws Exception {
-    return executor.submit(new Callable<HostAndPort>() {
+  private void awaitSchedulerReady() throws Exception {
+    executor.submit(new Callable<Void>() {
       @Override
-      public HostAndPort call() throws Exception {
-        final AtomicReference<HostAndPort> thriftEndpoint = Atomics.newReference();
+      public Void call() throws Exception {
         ServerSet schedulerService = new ServerSetImpl(zkClient, SERVERSET_PATH);
         final CountDownLatch schedulerReady = new CountDownLatch(1);
         schedulerService.watch(new HostChangeMonitor<ServiceInstance>() {
           @Override
           public void onChange(ImmutableSet<ServiceInstance> hostSet) {
             if (!hostSet.isEmpty()) {
-              Endpoint endpoint = Iterables.getOnlyElement(hostSet).getServiceEndpoint();
-              thriftEndpoint.set(HostAndPort.fromParts(endpoint.getHost(), endpoint.getPort()));
               schedulerReady.countDown();
             }
           }
@@ -257,7 +252,7 @@ public class SchedulerIT extends BaseZooKeeperTest {
         // A timeout is used because certain types of assertion errors (mocks) will not surface
         // until the main test thread exits this body of code.
         assertTrue(schedulerReady.await(5L, TimeUnit.MINUTES));
-        return thriftEndpoint.get();
+        return null;
       }
     }).get();
   }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/10fce64b/src/test/python/apache/aurora/client/api/test_scheduler_client.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/api/test_scheduler_client.py b/src/test/python/apache/aurora/client/api/test_scheduler_client.py
index 399d4c6..7bf2c32 100644
--- a/src/test/python/apache/aurora/client/api/test_scheduler_client.py
+++ b/src/test/python/apache/aurora/client/api/test_scheduler_client.py
@@ -322,7 +322,7 @@ class TestZookeeperSchedulerClient(unittest.TestCase):
       zk_scheduler_client = TestZookeeperSchedulerClient(Cluster(proxy_url=None))
       self.mox.StubOutWithMock(zk_scheduler_client, '_connect_scheduler')
       mock_zk.stop()
-      zk_scheduler_client._connect_scheduler(host, port, False)
+      zk_scheduler_client._connect_scheduler(host, port)
 
       self.mox.ReplayAll()
 

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/10fce64b/src/test/resources/org/apache/aurora/scheduler/app/AuroraTestKeyStore
----------------------------------------------------------------------
diff --git a/src/test/resources/org/apache/aurora/scheduler/app/AuroraTestKeyStore b/src/test/resources/org/apache/aurora/scheduler/app/AuroraTestKeyStore
deleted file mode 100644
index 1157471..0000000
Binary files a/src/test/resources/org/apache/aurora/scheduler/app/AuroraTestKeyStore and /dev/null differ