You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by cs...@apache.org on 2023/03/09 14:13:28 UTC

[impala] 06/07: IMPALA-11946: Add Thrift HTTP support for external frontend

This is an automated email from the ASF dual-hosted git repository.

csringhofer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit c09578a4840715854100b72b68b813107a3e25d4
Author: John Sherman <jf...@cloudera.com>
AuthorDate: Wed Feb 1 14:25:37 2023 -0800

    IMPALA-11946: Add Thrift HTTP support for external frontend
    
    - Add enable_external_fe_http flag that defaults to false
      - When true the external frontend service (external_fe_port) will
      expect clients to use http transport.
      - When false the external frontend service will expect binary
      transport.
    
    - Add tests for basic external frontend functionality
    - Add test to ensure the non-external frontend services do not expose
      the ExecutePlannedStatement interface.
    
    Change-Id: I2ad400b1df471e3d61b62d8c0360b27396c26050
    Reviewed-on: http://gerrit.cloudera.org:8080/19537
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
    Reviewed-by: Aman Sinha <am...@cloudera.com>
---
 be/src/service/impala-server.cc                    |  26 ++-
 .../impala/customcluster/CustomClusterRunner.java  |   6 +
 .../impala/customcluster/ExternalFrontendTest.java | 195 +++++++++++++++++++++
 3 files changed, 220 insertions(+), 7 deletions(-)

diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index c0add43c2..2e1046c1e 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -163,6 +163,9 @@ DEFINE_int32(external_fe_port, 0, "port on which External Frontend requests are
     "If 0 or less, the External Frontend server is not started. Careful consideration "
     "must be taken when enabling due to the fact that this port is currently always "
     "unauthenticated.");
+DEFINE_bool(enable_external_fe_http, false,
+    "if true enables http transport for external_fe_port otherwise binary transport is "
+    "used");
 
 DEFINE_int32(fe_service_threads, 64,
     "number of threads available to serve client requests");
@@ -3031,17 +3034,26 @@ Status ImpalaServer::Start(int32_t beeswax_port, int32_t hs2_port,
           new RpcEventHandler("external_frontend", exec_env_->metrics()));
       external_fe_processor->setEventHandler(event_handler);
 
+      ThriftServer::TransportType external_fe_port_transport =
+          ThriftServer::TransportType::BINARY;
+      if (FLAGS_enable_external_fe_http) {
+        LOG(INFO) << "External FE endpoint is using HTTP for transport";
+        external_fe_port_transport = ThriftServer::TransportType::HTTP;
+      }
+
       ThriftServerBuilder builder(EXTERNAL_FRONTEND_SERVER_NAME, external_fe_processor,
           external_fe_port);
       ThriftServer* server;
       RETURN_IF_ERROR(
-          builder.auth_provider(
-              AuthManager::GetInstance()->GetExternalFrontendAuthProvider())
-          .metrics(exec_env_->metrics())
-          .max_concurrent_connections(FLAGS_fe_service_threads)
-          .queue_timeout_ms(FLAGS_accepted_client_cnxn_timeout)
-          .idle_poll_period_ms(FLAGS_idle_client_poll_period_s * MILLIS_PER_SEC)
-          .Build(&server));
+          builder
+              .auth_provider(
+                  AuthManager::GetInstance()->GetExternalFrontendAuthProvider())
+              .transport_type(external_fe_port_transport)
+              .metrics(exec_env_->metrics())
+              .max_concurrent_connections(FLAGS_fe_service_threads)
+              .queue_timeout_ms(FLAGS_accepted_client_cnxn_timeout)
+              .idle_poll_period_ms(FLAGS_idle_client_poll_period_s * MILLIS_PER_SEC)
+              .Build(&server));
       external_fe_server_.reset(server);
       external_fe_server_->SetConnectionHandler(this);
     }
diff --git a/fe/src/test/java/org/apache/impala/customcluster/CustomClusterRunner.java b/fe/src/test/java/org/apache/impala/customcluster/CustomClusterRunner.java
index a2547a8a1..ecc1924f2 100644
--- a/fe/src/test/java/org/apache/impala/customcluster/CustomClusterRunner.java
+++ b/fe/src/test/java/org/apache/impala/customcluster/CustomClusterRunner.java
@@ -63,6 +63,12 @@ class CustomClusterRunner {
         impaladArgs, catalogdArgs, statestoredArgs, new HashMap<String, String>(), "");
   }
 
+  public static int StartImpalaCluster(String impaladArgs, String catalogdArgs,
+      String statestoredArgs, String startArgs) throws IOException, InterruptedException {
+    return StartImpalaCluster(impaladArgs, catalogdArgs, statestoredArgs,
+        new HashMap<String, String>(), startArgs);
+  }
+
   /**
    * Starts Impala, setting environment variables in 'env', and passing 'impalad_args',
    * 'catalogd_args', 'statestored_args', and 'startArgs' to start-impala-cluster.py.
diff --git a/fe/src/test/java/org/apache/impala/customcluster/ExternalFrontendTest.java b/fe/src/test/java/org/apache/impala/customcluster/ExternalFrontendTest.java
new file mode 100644
index 000000000..dc9866649
--- /dev/null
+++ b/fe/src/test/java/org/apache/impala/customcluster/ExternalFrontendTest.java
@@ -0,0 +1,195 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.customcluster;
+
+import java.util.List;
+
+import org.apache.hive.service.rpc.thrift.TCloseSessionReq;
+import org.apache.hive.service.rpc.thrift.TCloseSessionResp;
+import org.apache.hive.service.rpc.thrift.TColumn;
+import org.apache.hive.service.rpc.thrift.TExecuteStatementReq;
+import org.apache.hive.service.rpc.thrift.TExecuteStatementResp;
+import org.apache.hive.service.rpc.thrift.TFetchOrientation;
+import org.apache.hive.service.rpc.thrift.TFetchResultsReq;
+import org.apache.hive.service.rpc.thrift.TFetchResultsResp;
+import org.apache.hive.service.rpc.thrift.TOpenSessionReq;
+import org.apache.hive.service.rpc.thrift.TOpenSessionResp;
+import org.apache.hive.service.rpc.thrift.TStatus;
+import org.apache.hive.service.rpc.thrift.TStatusCode;
+import org.apache.impala.catalog.Catalog;
+import org.apache.impala.common.FrontendFixture;
+import org.apache.impala.common.ImpalaException;
+import org.apache.impala.service.Frontend.PlanCtx;
+import org.apache.impala.service.Frontend;
+import org.apache.impala.testutil.TestUtils;
+import org.apache.impala.thrift.ImpalaHiveServer2Service;
+import org.apache.impala.thrift.TExecRequest;
+import org.apache.impala.thrift.TExecutePlannedStatementReq;
+import org.apache.impala.thrift.TQueryCtx;
+import org.apache.impala.thrift.TQueryOptions;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.transport.THttpClient;
+import org.apache.thrift.transport.TSocket;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ExternalFrontendTest {
+  private final FrontendFixture feFixture_ = FrontendFixture.instance();
+  private final Frontend frontend_ = feFixture_.frontend();
+  private final int externalFePort = 21159;
+  private final int hs2BinaryPort = 21050;
+  private final int hs2HttpPort = 28000;
+
+  void setup(int port, boolean isHttp) throws Exception {
+    String impaladFlags = "--external_fe_port=" + port;
+    if (isHttp) {
+      impaladFlags += " --enable_external_fe_http";
+    }
+    // Start the impala cluster with the first impalad configured with external frontend
+    // arguments
+    int ret = CustomClusterRunner.StartImpalaCluster(
+        "", "", "", "--per_impalad_args=" + impaladFlags);
+    Assert.assertEquals(
+        "custom cluster failed to start with args: " + impaladFlags, ret, 0);
+  }
+
+  void setupExternalFe() throws Exception { setup(externalFePort, false); }
+
+  void setupExternalFeHttp() throws Exception { setup(externalFePort, true); }
+
+  ImpalaHiveServer2Service.Client createBinaryClient(int port) throws Exception {
+    // Create a binary connection against the hs2 port
+    TSocket sock = new TSocket("localhost", port);
+    sock.open();
+    return new ImpalaHiveServer2Service.Client(new TBinaryProtocol(sock));
+  }
+
+  ImpalaHiveServer2Service.Client createHttpClient(int port) throws Exception {
+    String host_url = "http://localhost:" + port + "/cliservice";
+    THttpClient client = new THttpClient(host_url);
+    return new ImpalaHiveServer2Service.Client(new TBinaryProtocol(client));
+  }
+
+  static TStatus verifySuccess(TStatus status) throws Exception {
+    if (status.getStatusCode() == TStatusCode.SUCCESS_STATUS
+        || status.getStatusCode() == TStatusCode.SUCCESS_WITH_INFO_STATUS) {
+      return status;
+    }
+    throw new Exception(status.toString());
+  }
+
+  void executeTestQuery(ImpalaHiveServer2Service.Client client) throws Exception {
+    executeTestQuery(client, false);
+  }
+
+  TStatus executeTestQueryExpectFailure(ImpalaHiveServer2Service.Client client)
+      throws Exception {
+    return executeTestQuery(client, true);
+  }
+
+  TStatus executeTestQuery(ImpalaHiveServer2Service.Client client,
+      boolean shouldFailExecute) throws Exception {
+    String testStmt = "SELECT 'this is a test, this is only a test'";
+    String expectedValue = "this is a test, this is only a test";
+
+    // Create the TExecRequest
+    TQueryOptions options = new TQueryOptions();
+    options.setExec_single_node_rows_threshold(0);
+
+    TQueryCtx queryCtx =
+        TestUtils.createQueryContext(Catalog.DEFAULT_DB, System.getProperty("user.name"));
+    queryCtx.client_request.setStmt(testStmt);
+    queryCtx.client_request.query_options = options;
+
+    TExecRequest request = null;
+    try {
+      request = frontend_.createExecRequest(new PlanCtx(queryCtx));
+    } catch (ImpalaException e) {
+      Assert.fail(
+          "Failed to create exec request for '" + testStmt + "': " + e.getMessage());
+    }
+
+    // Open Session
+    TOpenSessionResp openResp = client.OpenSession(new TOpenSessionReq());
+    verifySuccess(openResp.getStatus());
+
+    // Create TExecutePlannedStatementReq
+    TExecuteStatementReq executeReq = new TExecuteStatementReq();
+    executeReq.setSessionHandle(openResp.getSessionHandle());
+    executeReq.setStatement(testStmt);
+    TExecutePlannedStatementReq executePlannedReq = new TExecutePlannedStatementReq();
+    executePlannedReq.setStatementReq(executeReq);
+    executePlannedReq.setPlan(request);
+
+    // Execute and Fetch
+    TExecuteStatementResp execResp = client.ExecutePlannedStatement(executePlannedReq);
+    if (shouldFailExecute) {
+      return execResp.getStatus();
+    }
+    verifySuccess(execResp.getStatus());
+
+    TFetchResultsReq fetchReq = new TFetchResultsReq(
+        execResp.getOperationHandle(), TFetchOrientation.FETCH_NEXT, 1000);
+    TFetchResultsResp fetchResp = client.FetchResults(fetchReq);
+    verifySuccess(fetchResp.getStatus());
+
+    // Verify Results
+    List<TColumn> columns = fetchResp.getResults().getColumns();
+    Assert.assertEquals(1, columns.size());
+    Assert.assertEquals(expectedValue, columns.get(0).getStringVal().getValues().get(0));
+
+    // Close Session
+    TCloseSessionResp closeResp =
+        client.CloseSession(new TCloseSessionReq(openResp.getSessionHandle()));
+    return verifySuccess(closeResp.getStatus());
+  }
+
+  @Test
+  public void testExternalFrontendBinary() throws Exception {
+    setupExternalFe();
+    executeTestQuery(createBinaryClient(externalFePort));
+  }
+
+  @Test
+  public void testExternalFrontendHttp() throws Exception {
+    setupExternalFeHttp();
+    executeTestQuery(createHttpClient(externalFePort));
+  }
+
+  @Test
+  public void testExecutePlannedStatementDisallowedNonExternalFe() throws Exception {
+    setupExternalFe();
+    // Try to execute a planned query against the hs2 service (it should fail)
+    TStatus status = executeTestQueryExpectFailure(createBinaryClient(hs2BinaryPort));
+    Assert.assertEquals(status.getStatusCode(), TStatusCode.ERROR_STATUS);
+    Assert.assertTrue(status.toString().contains("Unsupported operation"));
+
+    // Now try to execute against the hs2 http service (it should also fail)
+    status = executeTestQueryExpectFailure(createHttpClient(hs2HttpPort));
+    Assert.assertEquals(status.getStatusCode(), TStatusCode.ERROR_STATUS);
+    Assert.assertTrue(status.toString().contains("Unsupported operation"));
+  }
+
+  @After
+  public void cleanUp() throws Exception {
+    // Restore cluster to state before the test
+    CustomClusterRunner.StartImpalaCluster();
+  }
+}