You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by cs...@apache.org on 2023/03/09 14:13:28 UTC
[impala] 06/07: IMPALA-11946: Add Thrift HTTP support for external frontend
This is an automated email from the ASF dual-hosted git repository.
csringhofer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
commit c09578a4840715854100b72b68b813107a3e25d4
Author: John Sherman <jf...@cloudera.com>
AuthorDate: Wed Feb 1 14:25:37 2023 -0800
IMPALA-11946: Add Thrift HTTP support for external frontend
- Add enable_external_fe_http flag that defaults to false
- When true the external frontend service (external_fe_port) will
expect clients to use http transport.
- When false the external frontend service will expect binary
transport.
- Add tests for basic external frontend functionality
- Add test to ensure the non-external frontend services do not expose
the ExecutePlannedStatement interface.
Change-Id: I2ad400b1df471e3d61b62d8c0360b27396c26050
Reviewed-on: http://gerrit.cloudera.org:8080/19537
Tested-by: Impala Public Jenkins <im...@cloudera.com>
Reviewed-by: Aman Sinha <am...@cloudera.com>
---
be/src/service/impala-server.cc | 26 ++-
.../impala/customcluster/CustomClusterRunner.java | 6 +
.../impala/customcluster/ExternalFrontendTest.java | 195 +++++++++++++++++++++
3 files changed, 220 insertions(+), 7 deletions(-)
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index c0add43c2..2e1046c1e 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -163,6 +163,9 @@ DEFINE_int32(external_fe_port, 0, "port on which External Frontend requests are
"If 0 or less, the External Frontend server is not started. Careful consideration "
"must be taken when enabling due to the fact that this port is currently always "
"unauthenticated.");
+DEFINE_bool(enable_external_fe_http, false,
+ "if true enables http transport for external_fe_port otherwise binary transport is "
+ "used");
DEFINE_int32(fe_service_threads, 64,
"number of threads available to serve client requests");
@@ -3031,17 +3034,26 @@ Status ImpalaServer::Start(int32_t beeswax_port, int32_t hs2_port,
new RpcEventHandler("external_frontend", exec_env_->metrics()));
external_fe_processor->setEventHandler(event_handler);
+ ThriftServer::TransportType external_fe_port_transport =
+ ThriftServer::TransportType::BINARY;
+ if (FLAGS_enable_external_fe_http) {
+ LOG(INFO) << "External FE endpoint is using HTTP for transport";
+ external_fe_port_transport = ThriftServer::TransportType::HTTP;
+ }
+
ThriftServerBuilder builder(EXTERNAL_FRONTEND_SERVER_NAME, external_fe_processor,
external_fe_port);
ThriftServer* server;
RETURN_IF_ERROR(
- builder.auth_provider(
- AuthManager::GetInstance()->GetExternalFrontendAuthProvider())
- .metrics(exec_env_->metrics())
- .max_concurrent_connections(FLAGS_fe_service_threads)
- .queue_timeout_ms(FLAGS_accepted_client_cnxn_timeout)
- .idle_poll_period_ms(FLAGS_idle_client_poll_period_s * MILLIS_PER_SEC)
- .Build(&server));
+ builder
+ .auth_provider(
+ AuthManager::GetInstance()->GetExternalFrontendAuthProvider())
+ .transport_type(external_fe_port_transport)
+ .metrics(exec_env_->metrics())
+ .max_concurrent_connections(FLAGS_fe_service_threads)
+ .queue_timeout_ms(FLAGS_accepted_client_cnxn_timeout)
+ .idle_poll_period_ms(FLAGS_idle_client_poll_period_s * MILLIS_PER_SEC)
+ .Build(&server));
external_fe_server_.reset(server);
external_fe_server_->SetConnectionHandler(this);
}
diff --git a/fe/src/test/java/org/apache/impala/customcluster/CustomClusterRunner.java b/fe/src/test/java/org/apache/impala/customcluster/CustomClusterRunner.java
index a2547a8a1..ecc1924f2 100644
--- a/fe/src/test/java/org/apache/impala/customcluster/CustomClusterRunner.java
+++ b/fe/src/test/java/org/apache/impala/customcluster/CustomClusterRunner.java
@@ -63,6 +63,12 @@ class CustomClusterRunner {
impaladArgs, catalogdArgs, statestoredArgs, new HashMap<String, String>(), "");
}
+ public static int StartImpalaCluster(String impaladArgs, String catalogdArgs,
+ String statestoredArgs, String startArgs) throws IOException, InterruptedException {
+ return StartImpalaCluster(impaladArgs, catalogdArgs, statestoredArgs,
+ new HashMap<String, String>(), startArgs);
+ }
+
/**
* Starts Impala, setting environment variables in 'env', and passing 'impalad_args',
* 'catalogd_args', 'statestored_args', and 'startArgs' to start-impala-cluster.py.
diff --git a/fe/src/test/java/org/apache/impala/customcluster/ExternalFrontendTest.java b/fe/src/test/java/org/apache/impala/customcluster/ExternalFrontendTest.java
new file mode 100644
index 000000000..dc9866649
--- /dev/null
+++ b/fe/src/test/java/org/apache/impala/customcluster/ExternalFrontendTest.java
@@ -0,0 +1,195 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.customcluster;
+
+import java.util.List;
+
+import org.apache.hive.service.rpc.thrift.TCloseSessionReq;
+import org.apache.hive.service.rpc.thrift.TCloseSessionResp;
+import org.apache.hive.service.rpc.thrift.TColumn;
+import org.apache.hive.service.rpc.thrift.TExecuteStatementReq;
+import org.apache.hive.service.rpc.thrift.TExecuteStatementResp;
+import org.apache.hive.service.rpc.thrift.TFetchOrientation;
+import org.apache.hive.service.rpc.thrift.TFetchResultsReq;
+import org.apache.hive.service.rpc.thrift.TFetchResultsResp;
+import org.apache.hive.service.rpc.thrift.TOpenSessionReq;
+import org.apache.hive.service.rpc.thrift.TOpenSessionResp;
+import org.apache.hive.service.rpc.thrift.TStatus;
+import org.apache.hive.service.rpc.thrift.TStatusCode;
+import org.apache.impala.catalog.Catalog;
+import org.apache.impala.common.FrontendFixture;
+import org.apache.impala.common.ImpalaException;
+import org.apache.impala.service.Frontend.PlanCtx;
+import org.apache.impala.service.Frontend;
+import org.apache.impala.testutil.TestUtils;
+import org.apache.impala.thrift.ImpalaHiveServer2Service;
+import org.apache.impala.thrift.TExecRequest;
+import org.apache.impala.thrift.TExecutePlannedStatementReq;
+import org.apache.impala.thrift.TQueryCtx;
+import org.apache.impala.thrift.TQueryOptions;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.transport.THttpClient;
+import org.apache.thrift.transport.TSocket;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ExternalFrontendTest {
+ private final FrontendFixture feFixture_ = FrontendFixture.instance();
+ private final Frontend frontend_ = feFixture_.frontend();
+ private final int externalFePort = 21159;
+ private final int hs2BinaryPort = 21050;
+ private final int hs2HttpPort = 28000;
+
+ void setup(int port, boolean isHttp) throws Exception {
+ String impaladFlags = "--external_fe_port=" + port;
+ if (isHttp) {
+ impaladFlags += " --enable_external_fe_http";
+ }
+ // Start the impala cluster with the first impalad configured with external frontend
+ // arguments
+ int ret = CustomClusterRunner.StartImpalaCluster(
+ "", "", "", "--per_impalad_args=" + impaladFlags);
+ Assert.assertEquals(
+ "custom cluster failed to start with args: " + impaladFlags, ret, 0);
+ }
+
+ void setupExternalFe() throws Exception { setup(externalFePort, false); }
+
+ void setupExternalFeHttp() throws Exception { setup(externalFePort, true); }
+
+ ImpalaHiveServer2Service.Client createBinaryClient(int port) throws Exception {
+ // Create a binary connection against the hs2 port
+ TSocket sock = new TSocket("localhost", port);
+ sock.open();
+ return new ImpalaHiveServer2Service.Client(new TBinaryProtocol(sock));
+ }
+
+ ImpalaHiveServer2Service.Client createHttpClient(int port) throws Exception {
+ String host_url = "http://localhost:" + port + "/cliservice";
+ THttpClient client = new THttpClient(host_url);
+ return new ImpalaHiveServer2Service.Client(new TBinaryProtocol(client));
+ }
+
+ static TStatus verifySuccess(TStatus status) throws Exception {
+ if (status.getStatusCode() == TStatusCode.SUCCESS_STATUS
+ || status.getStatusCode() == TStatusCode.SUCCESS_WITH_INFO_STATUS) {
+ return status;
+ }
+ throw new Exception(status.toString());
+ }
+
+ void executeTestQuery(ImpalaHiveServer2Service.Client client) throws Exception {
+ executeTestQuery(client, false);
+ }
+
+ TStatus executeTestQueryExpectFailure(ImpalaHiveServer2Service.Client client)
+ throws Exception {
+ return executeTestQuery(client, true);
+ }
+
+ TStatus executeTestQuery(ImpalaHiveServer2Service.Client client,
+ boolean shouldFailExecute) throws Exception {
+ String testStmt = "SELECT 'this is a test, this is only a test'";
+ String expectedValue = "this is a test, this is only a test";
+
+ // Create the TExecRequest
+ TQueryOptions options = new TQueryOptions();
+ options.setExec_single_node_rows_threshold(0);
+
+ TQueryCtx queryCtx =
+ TestUtils.createQueryContext(Catalog.DEFAULT_DB, System.getProperty("user.name"));
+ queryCtx.client_request.setStmt(testStmt);
+ queryCtx.client_request.query_options = options;
+
+ TExecRequest request = null;
+ try {
+ request = frontend_.createExecRequest(new PlanCtx(queryCtx));
+ } catch (ImpalaException e) {
+ Assert.fail(
+ "Failed to create exec request for '" + testStmt + "': " + e.getMessage());
+ }
+
+ // Open Session
+ TOpenSessionResp openResp = client.OpenSession(new TOpenSessionReq());
+ verifySuccess(openResp.getStatus());
+
+ // Create TExecutePlannedStatementReq
+ TExecuteStatementReq executeReq = new TExecuteStatementReq();
+ executeReq.setSessionHandle(openResp.getSessionHandle());
+ executeReq.setStatement(testStmt);
+ TExecutePlannedStatementReq executePlannedReq = new TExecutePlannedStatementReq();
+ executePlannedReq.setStatementReq(executeReq);
+ executePlannedReq.setPlan(request);
+
+ // Execute and Fetch
+ TExecuteStatementResp execResp = client.ExecutePlannedStatement(executePlannedReq);
+ if (shouldFailExecute) {
+ return execResp.getStatus();
+ }
+ verifySuccess(execResp.getStatus());
+
+ TFetchResultsReq fetchReq = new TFetchResultsReq(
+ execResp.getOperationHandle(), TFetchOrientation.FETCH_NEXT, 1000);
+ TFetchResultsResp fetchResp = client.FetchResults(fetchReq);
+ verifySuccess(fetchResp.getStatus());
+
+ // Verify Results
+ List<TColumn> columns = fetchResp.getResults().getColumns();
+ Assert.assertEquals(1, columns.size());
+ Assert.assertEquals(expectedValue, columns.get(0).getStringVal().getValues().get(0));
+
+ // Close Session
+ TCloseSessionResp closeResp =
+ client.CloseSession(new TCloseSessionReq(openResp.getSessionHandle()));
+ return verifySuccess(closeResp.getStatus());
+ }
+
+ @Test
+ public void testExternalFrontendBinary() throws Exception {
+ setupExternalFe();
+ executeTestQuery(createBinaryClient(externalFePort));
+ }
+
+ @Test
+ public void testExternalFrontendHttp() throws Exception {
+ setupExternalFeHttp();
+ executeTestQuery(createHttpClient(externalFePort));
+ }
+
+ @Test
+ public void testExecutePlannedStatementDisallowedNonExternalFe() throws Exception {
+ setupExternalFe();
+ // Try to execute a planned query against the hs2 service (it should fail)
+ TStatus status = executeTestQueryExpectFailure(createBinaryClient(hs2BinaryPort));
+ Assert.assertEquals(status.getStatusCode(), TStatusCode.ERROR_STATUS);
+ Assert.assertTrue(status.toString().contains("Unsupported operation"));
+
+ // Now try to execute against the hs2 http service (it should also fail)
+ status = executeTestQueryExpectFailure(createHttpClient(hs2HttpPort));
+ Assert.assertEquals(status.getStatusCode(), TStatusCode.ERROR_STATUS);
+ Assert.assertTrue(status.toString().contains("Unsupported operation"));
+ }
+
+ @After
+ public void cleanUp() throws Exception {
+ // Restore cluster to state before the test
+ CustomClusterRunner.StartImpalaCluster();
+ }
+}