You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by dm...@apache.org on 2019/01/23 20:13:48 UTC

[camel] branch camel-2.x updated: CAMEL-12213: Update camel-thrift to libthrift 0.12.0

This is an automated email from the ASF dual-hosted git repository.

dmvolod pushed a commit to branch camel-2.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-2.x by this push:
     new 14dcc47  CAMEL-12213: Update camel-thrift to libthrift 0.12.0
14dcc47 is described below

commit 14dcc47a718033a0cb52307eaf257c6d03ae494f
Author: Dmitry Volodin <dm...@gmail.com>
AuthorDate: Wed Jan 23 22:36:12 2019 +0300

    CAMEL-12213: Update camel-thrift to libthrift 0.12.0
---
 .../camel/component/thrift/ThriftConsumer.java     |  16 +-
 .../camel/component/thrift/ThriftEndpoint.java     |   1 +
 .../camel/component/thrift/ThriftProducer.java     |   8 +-
 .../thrift/server/ThriftThreadPoolServer.java      | 221 +--------------------
 .../thrift/ThriftConsumerSecurityTest.java         |   8 +-
 .../thrift/ThriftProducerSecurityTest.java         |   8 +-
 parent/pom.xml                                     |   2 +-
 7 files changed, 36 insertions(+), 228 deletions(-)

diff --git a/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/ThriftConsumer.java b/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/ThriftConsumer.java
index 395b7a2..ed08011 100644
--- a/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/ThriftConsumer.java
+++ b/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/ThriftConsumer.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.component.thrift;
 
+import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
@@ -40,8 +41,6 @@ import org.apache.thrift.transport.TSSLTransportFactory;
 import org.apache.thrift.transport.TServerSocket;
 import org.apache.thrift.transport.TTransportException;
 import org.apache.thrift.transport.TZlibTransport;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * Represents Thrift server consumer implementation
@@ -96,7 +95,7 @@ public class ThriftConsumer extends DefaultConsumer {
     }
 
     @SuppressWarnings({"rawtypes", "unchecked"})
-    protected void initializeServer() throws TTransportException {
+    protected void initializeServer() throws TTransportException, IOException {
         Class serverImplementationClass;
         Object serverImplementationInstance;
         Object serverProcessor;
@@ -117,7 +116,9 @@ public class ThriftConsumer extends DefaultConsumer {
         }
 
         if (configuration.getNegotiationType() == ThriftNegotiationType.SSL && endpoint.isSynchronous()) {
+            ClassResolver classResolver = endpoint.getCamelContext().getClassResolver();
             SSLContextParameters sslParameters = configuration.getSslParameters();
+            
             if (sslParameters == null) {
                 throw new IllegalArgumentException("SSL parameters must be initialized if negotiation type is set to " + configuration.getNegotiationType());
             }
@@ -132,10 +133,13 @@ public class ThriftConsumer extends DefaultConsumer {
                                                      : sslParameters.getCipherSuites().getCipherSuite().stream().toArray(String[]::new));
             
             if (ObjectHelper.isNotEmpty(sslParameters.getKeyManagers().getKeyStore().getProvider()) && ObjectHelper.isNotEmpty(sslParameters.getKeyManagers().getKeyStore().getType())) {
-                sslParams.setKeyStore(sslParameters.getKeyManagers().getKeyStore().getResource(), sslParameters.getKeyManagers().getKeyStore().getPassword(),
-                                      sslParameters.getKeyManagers().getKeyStore().getProvider(), sslParameters.getKeyManagers().getKeyStore().getType());
+                sslParams.setKeyStore(ResourceHelper.resolveResourceAsInputStream(classResolver, sslParameters.getKeyManagers().getKeyStore().getResource()),
+                                      sslParameters.getKeyManagers().getKeyStore().getPassword(),
+                                      sslParameters.getKeyManagers().getKeyStore().getProvider(),
+                                      sslParameters.getKeyManagers().getKeyStore().getType());
             } else {
-                sslParams.setKeyStore(sslParameters.getKeyManagers().getKeyStore().getResource(), sslParameters.getKeyManagers().getKeyStore().getPassword());
+                sslParams.setKeyStore(ResourceHelper.resolveResourceAsInputStream(classResolver, sslParameters.getKeyManagers().getKeyStore().getResource()),
+                                      sslParameters.getKeyManagers().getKeyStore().getPassword());
             }
 
             try {
diff --git a/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/ThriftEndpoint.java b/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/ThriftEndpoint.java
index ffd79c3..53035a4 100644
--- a/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/ThriftEndpoint.java
+++ b/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/ThriftEndpoint.java
@@ -23,6 +23,7 @@ import org.apache.camel.impl.DefaultEndpoint;
 import org.apache.camel.impl.SynchronousDelegateProducer;
 import org.apache.camel.spi.UriEndpoint;
 import org.apache.camel.spi.UriParam;
+import org.apache.camel.support.DefaultEndpoint;
 
 /**
  * The Thrift component allows to call and expose remote procedures (RPC) with
diff --git a/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/ThriftProducer.java b/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/ThriftProducer.java
index 5e27227..481c4f5 100644
--- a/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/ThriftProducer.java
+++ b/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/ThriftProducer.java
@@ -157,7 +157,7 @@ public class ThriftProducer extends DefaultProducer implements AsyncProcessor {
         }
     }
     
-    protected void initializeSslTransport() throws TTransportException {
+    protected void initializeSslTransport() throws TTransportException, IOException {
         if (!ObjectHelper.isEmpty(configuration.getHost()) && !ObjectHelper.isEmpty(configuration.getPort())) {
             SSLContextParameters sslParameters = configuration.getSslParameters();
             if (sslParameters == null) {
@@ -176,8 +176,10 @@ public class ThriftProducer extends DefaultProducer implements AsyncProcessor {
                                                                          : sslParameters.getCipherSuites().getCipherSuite().stream().toArray(String[]::new));
             
             if (ObjectHelper.isNotEmpty(sslParameters.getTrustManagers().getProvider()) && ObjectHelper.isNotEmpty(sslParameters.getTrustManagers().getKeyStore().getType())) {
-                sslParams.setTrustStore(sslParameters.getTrustManagers().getKeyStore().getResource(), sslParameters.getTrustManagers().getKeyStore().getPassword(),
-                                        sslParameters.getTrustManagers().getProvider(), sslParameters.getTrustManagers().getKeyStore().getType());
+                sslParams.setTrustStore(ResourceHelper.resolveResourceAsInputStream(classResolver, sslParameters.getTrustManagers().getKeyStore().getResource()),
+                                        sslParameters.getTrustManagers().getKeyStore().getPassword(),
+                                        sslParameters.getTrustManagers().getProvider(),
+                                        sslParameters.getTrustManagers().getKeyStore().getType());
             } else {
                 sslParams.setTrustStore(sslParameters.getTrustManagers().getKeyStore().getResource(), sslParameters.getTrustManagers().getKeyStore().getPassword());
             }
diff --git a/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/server/ThriftThreadPoolServer.java b/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/server/ThriftThreadPoolServer.java
index 2c99c98..65ca03a 100644
--- a/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/server/ThriftThreadPoolServer.java
+++ b/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/server/ThriftThreadPoolServer.java
@@ -16,72 +16,26 @@
  */
 package org.apache.camel.component.thrift.server;
 
-import java.util.Random;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.CamelContext;
-import org.apache.thrift.TException;
-import org.apache.thrift.TProcessor;
-import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.server.ServerContext;
-import org.apache.thrift.server.TServer;
-import org.apache.thrift.server.TServerEventHandler;
-import org.apache.thrift.transport.TSaslTransportException;
+import org.apache.thrift.server.TThreadPoolServer;
 import org.apache.thrift.transport.TServerTransport;
-import org.apache.thrift.transport.TTransport;
-import org.apache.thrift.transport.TTransportException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
 
 /*
  * Thrift ThreadPoolServer implementation with executors controlled by the Camel Executor Service Manager
  */
-public class ThriftThreadPoolServer extends TServer {
-    private static final Logger LOGGER = LoggerFactory.getLogger(ThriftThreadPoolServer.class.getName());
+public class ThriftThreadPoolServer extends TThreadPoolServer {
 
-    public static class Args extends AbstractServerArgs<Args> {
-        private ExecutorService executorService;
+    public static class Args extends TThreadPoolServer.Args {
         private ExecutorService startThreadPool;
         private CamelContext context;
-
-        private int requestTimeout = 20;
-        private TimeUnit requestTimeoutUnit = TimeUnit.SECONDS;
-        private int beBackoffSlotLength = 100;
-        private TimeUnit beBackoffSlotLengthUnit = TimeUnit.MILLISECONDS;
-
+        
         public Args(TServerTransport transport) {
             super(transport);
         }
 
-        public Args requestTimeout(int n) {
-            requestTimeout = n;
-            return this;
-        }
-
-        public Args requestTimeoutUnit(TimeUnit tu) {
-            requestTimeoutUnit = tu;
-            return this;
-        }
-
-        // Binary exponential backoff slot length
-        public Args beBackoffSlotLength(int n) {
-            beBackoffSlotLength = n;
-            return this;
-        }
-
-        // Binary exponential backoff slot time unit
-        public Args beBackoffSlotLengthUnit(TimeUnit tu) {
-            beBackoffSlotLengthUnit = tu;
-            return this;
-        }
-
-        public Args executorService(ExecutorService executorService) {
-            this.executorService = executorService;
-            return this;
-        }
-
         public Args startThreadPool(ExecutorService startThreadPool) {
             this.startThreadPool = startThreadPool;
             return this;
@@ -94,188 +48,35 @@ public class ThriftThreadPoolServer extends TServer {
     }
 
     // Executor service for handling client connections
-    private final ExecutorService invoker;
     private final CamelContext context;
     private final ExecutorService startExecutor;
 
-    private final TimeUnit requestTimeoutUnit;
-
-    private final long requestTimeout;
-
-    private final long beBackoffSlotInMillis;
-
-    private Random random = new Random(System.currentTimeMillis());
 
     public ThriftThreadPoolServer(Args args) {
         super(args);
 
-        requestTimeoutUnit = args.requestTimeoutUnit;
-        requestTimeout = args.requestTimeout;
-        beBackoffSlotInMillis = args.beBackoffSlotLengthUnit.toMillis(args.beBackoffSlotLength);
-
         context = args.context;
-        invoker = args.executorService;
         startExecutor = args.startThreadPool;
     }
 
+    @Override
     public void serve() {
-        try {
-            serverTransport_.listen();
-        } catch (TTransportException ttx) {
-            LOGGER.error("Error occurred during listening.", ttx);
+        if (!preServe()) {
             return;
         }
 
-        // Run the preServe event
-        if (eventHandler_ != null) {
-            eventHandler_.preServe();
-        }
-
         startExecutor.execute(() -> {
-            stopped_ = false;
-            setServing(true);
-            
+            execute();
             waitForShutdown();
             
-            context.getExecutorServiceManager().shutdownGraceful(invoker);
+            context.getExecutorServiceManager().shutdownGraceful(getExecutorService());
             setServing(false);
         });
     }
 
-    private void waitForShutdown() {
-        while (!stopped_) {
-            try {
-                TTransport client = serverTransport_.accept();
-                WorkerProcess wp = new WorkerProcess(client);
-
-                int retryCount = 0;
-                long remainTimeInMillis = requestTimeoutUnit.toMillis(requestTimeout);
-                while (true) {
-                    try {
-                        invoker.execute(wp);
-                        break;
-                    } catch (Throwable t) {
-                        if (t instanceof RejectedExecutionException) {
-                            retryCount++;
-                            try {
-                                if (remainTimeInMillis > 0) {
-                                    // do a truncated 20 binary exponential
-                                    // backoff sleep
-                                    long sleepTimeInMillis = ((long)(random.nextDouble() * (1L << Math.min(retryCount, 20)))) * beBackoffSlotInMillis;
-                                    sleepTimeInMillis = Math.min(sleepTimeInMillis, remainTimeInMillis);
-                                    TimeUnit.MILLISECONDS.sleep(sleepTimeInMillis);
-                                    remainTimeInMillis = remainTimeInMillis - sleepTimeInMillis;
-                                } else {
-                                    client.close();
-                                    wp = null;
-                                    LOGGER.warn("Task has been rejected by ExecutorService " + retryCount + " times till timedout, reason: " + t);
-                                    break;
-                                }
-                            } catch (InterruptedException e) {
-                                LOGGER.warn("Interrupted while waiting to place client on executor queue.");
-                                Thread.currentThread().interrupt();
-                                break;
-                            }
-                        } else if (t instanceof Error) {
-                            LOGGER.error("ExecutorService threw error: " + t, t);
-                            throw (Error)t;
-                        } else {
-                            // for other possible runtime errors from
-                            // ExecutorService, should also not kill serve
-                            LOGGER.warn("ExecutorService threw error: " + t, t);
-                            break;
-                        }
-                    }
-                }
-            } catch (TTransportException ttx) {
-                if (!stopped_) {
-                    LOGGER.warn("Transport error occurred during acceptance of message.", ttx);
-                }
-            }
-        }
-    }
-
+    @Override
     public void stop() {
-        stopped_ = true;
-        serverTransport_.interrupt();
+        super.stop();
         context.getExecutorServiceManager().shutdownGraceful(startExecutor);
     }
-
-    private final class WorkerProcess implements Runnable {
-
-        /**
-         * Client that this services.
-         */
-        private TTransport client;
-
-        /**
-         * Default constructor.
-         *
-         * @param client Transport to process
-         */
-        private WorkerProcess(TTransport client) {
-            this.client = client;
-        }
-
-        /**
-         * Loops on processing a client forever
-         */
-        public void run() {
-            TProcessor processor = null;
-            TTransport inputTransport = null;
-            TTransport outputTransport = null;
-            TProtocol inputProtocol = null;
-            TProtocol outputProtocol = null;
-
-            TServerEventHandler eventHandler = null;
-            ServerContext connectionContext = null;
-
-            try {
-                processor = processorFactory_.getProcessor(client);
-                inputTransport = inputTransportFactory_.getTransport(client);
-                outputTransport = outputTransportFactory_.getTransport(client);
-                inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
-                outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);
-
-                eventHandler = getEventHandler();
-                if (eventHandler != null) {
-                    connectionContext = eventHandler.createContext(inputProtocol, outputProtocol);
-                }
-                // we check stopped_ first to make sure we're not supposed to be
-                // shutting
-                // down. this is necessary for graceful shutdown.
-                while (true) {
-
-                    if (eventHandler != null) {
-                        eventHandler.processContext(connectionContext, inputTransport, outputTransport);
-                    }
-
-                    if (stopped_ || !processor.process(inputProtocol, outputProtocol)) {
-                        break;
-                    }
-                }
-            } catch (TSaslTransportException ttx) {
-                // Something thats not SASL was in the stream, continue silently
-            } catch (TTransportException ttx) {
-                // Assume the client died and continue silently
-            } catch (TException tx) {
-                LOGGER.error("Thrift error occurred during processing of message.", tx);
-            } catch (Exception x) {
-                LOGGER.error("Error occurred during processing of message.", x);
-            } finally {
-                if (eventHandler != null) {
-                    eventHandler.deleteContext(connectionContext, inputProtocol, outputProtocol);
-                }
-                if (inputTransport != null) {
-                    inputTransport.close();
-                }
-                if (outputTransport != null) {
-                    outputTransport.close();
-                }
-                if (client.isOpen()) {
-                    client.close();
-                }
-            }
-        }
-    }
 }
diff --git a/components/camel-thrift/src/test/java/org/apache/camel/component/thrift/ThriftConsumerSecurityTest.java b/components/camel-thrift/src/test/java/org/apache/camel/component/thrift/ThriftConsumerSecurityTest.java
index dd316f7..a85486d 100644
--- a/components/camel-thrift/src/test/java/org/apache/camel/component/thrift/ThriftConsumerSecurityTest.java
+++ b/components/camel-thrift/src/test/java/org/apache/camel/component/thrift/ThriftConsumerSecurityTest.java
@@ -45,8 +45,8 @@ public class ThriftConsumerSecurityTest extends CamelTestSupport {
     private static final int THRIFT_TEST_PORT = AvailablePortFinder.getNextAvailable();
     private static final int THRIFT_TEST_NUM1 = 12;
     private static final int THRIFT_TEST_NUM2 = 13;
-    private static final String TRUST_STORE_PATH = "src/test/resources/certs/truststore.jks";
-    private static final String KEY_STORE_PATH = "src/test/resources/certs/keystore.jks";
+    private static final String TRUST_STORE_RESOURCE = "file:src/test/resources/certs/truststore.jks";
+    private static final String KEY_STORE_RESOURCE = "file:src/test/resources/certs/keystore.jks";
     private static final String SECURITY_STORE_PASSWORD = "camelinaction";
     private static final int THRIFT_CLIENT_TIMEOUT = 2000;
     
@@ -62,7 +62,7 @@ public class ThriftConsumerSecurityTest extends CamelTestSupport {
             
             TSSLTransportFactory.TSSLTransportParameters sslParams = new TSSLTransportFactory.TSSLTransportParameters();
             
-            sslParams.setTrustStore(TRUST_STORE_PATH, SECURITY_STORE_PASSWORD);
+            sslParams.setTrustStore(TRUST_STORE_RESOURCE, SECURITY_STORE_PASSWORD);
             transport = TSSLTransportFactory.getClientSocket("localhost", THRIFT_TEST_PORT, THRIFT_CLIENT_TIMEOUT, sslParams);
             
             protocol = new TBinaryProtocol(transport);
@@ -86,7 +86,7 @@ public class ThriftConsumerSecurityTest extends CamelTestSupport {
         SSLContextParameters sslParameters = new SSLContextParameters();
         
         KeyStoreParameters keyStoreParams = new KeyStoreParameters();
-        keyStoreParams.setResource(KEY_STORE_PATH);
+        keyStoreParams.setResource(KEY_STORE_RESOURCE);
         keyStoreParams.setPassword(SECURITY_STORE_PASSWORD);
         
         KeyManagersParameters keyManagerParams = new KeyManagersParameters();
diff --git a/components/camel-thrift/src/test/java/org/apache/camel/component/thrift/ThriftProducerSecurityTest.java b/components/camel-thrift/src/test/java/org/apache/camel/component/thrift/ThriftProducerSecurityTest.java
index 9038065..bf503e7 100644
--- a/components/camel-thrift/src/test/java/org/apache/camel/component/thrift/ThriftProducerSecurityTest.java
+++ b/components/camel-thrift/src/test/java/org/apache/camel/component/thrift/ThriftProducerSecurityTest.java
@@ -59,8 +59,8 @@ public class ThriftProducerSecurityTest extends CamelTestSupport {
     private static final int THRIFT_TEST_NUM1 = 12;
     private static final int THRIFT_TEST_NUM2 = 13;
     
-    private static final String TRUST_STORE_PATH = "src/test/resources/certs/truststore.jks";
-    private static final String KEY_STORE_PATH = "src/test/resources/certs/keystore.jks";
+    private static final String TRUST_STORE_SOURCE = "file:src/test/resources/certs/truststore.jks";
+    private static final String KEY_STORE_SOURCE = "file:src/test/resources/certs/keystore.jks";
     private static final String SECURITY_STORE_PASSWORD = "camelinaction";
     private static final int THRIFT_CLIENT_TIMEOUT = 2000;
     
@@ -71,7 +71,7 @@ public class ThriftProducerSecurityTest extends CamelTestSupport {
         
         TSSLTransportFactory.TSSLTransportParameters sslParams = new TSSLTransportFactory.TSSLTransportParameters();
         
-        sslParams.setKeyStore(KEY_STORE_PATH, SECURITY_STORE_PASSWORD);
+        sslParams.setKeyStore(KEY_STORE_SOURCE, SECURITY_STORE_PASSWORD);
         serverTransport = TSSLTransportFactory.getServerSocket(THRIFT_TEST_PORT, THRIFT_CLIENT_TIMEOUT, InetAddress.getByName("localhost"), sslParams);
         TThreadPoolServer.Args args = new TThreadPoolServer.Args(serverTransport);
         args.processor(processor);
@@ -101,7 +101,7 @@ public class ThriftProducerSecurityTest extends CamelTestSupport {
         SSLContextParameters sslParameters = new SSLContextParameters();
         
         KeyStoreParameters keyStoreParams = new KeyStoreParameters();
-        keyStoreParams.setResource(TRUST_STORE_PATH);
+        keyStoreParams.setResource(TRUST_STORE_SOURCE);
         keyStoreParams.setPassword(SECURITY_STORE_PASSWORD);
         
         TrustManagersParameters trustManagerParams = new TrustManagersParameters();
diff --git a/parent/pom.xml b/parent/pom.xml
index 268c925..e35280d 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -393,7 +393,7 @@
     <jgroups-raft-jgroups-version>4.0.15.Final</jgroups-raft-jgroups-version>
     <jgroups-raft-leveldbjni-version>1.8</jgroups-raft-leveldbjni-version>
     <jgroups-raft-mapdb-version>1.0.8</jgroups-raft-mapdb-version>
-    <libthrift-version>0.11.0</libthrift-version>
+    <libthrift-version>0.12.0</libthrift-version>
     <jibx-version>1.2.6</jibx-version>
     <jing-bundle-version>20030619_5</jing-bundle-version>
     <jing-version>20030619</jing-version>