You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by tm...@apache.org on 2019/05/30 17:29:53 UTC

[impala] 01/04: IMPALA-8538 (part 1) Copied THttp(Server|Transport) from thrift-0.9.3

This is an automated email from the ASF dual-hosted git repository.

tmarshall pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 9e13fd7de53978f7ef8543a3661ce70c5ed4d60a
Author: Thomas Tauber-Marshall <tm...@cloudera.com>
AuthorDate: Wed May 8 12:33:54 2019 -0700

    IMPALA-8538 (part 1) Copied THttp(Server|Transport) from thrift-0.9.3
    
    This is a mechanical change that just copies several files over from
    thrift. This is for convenience in reviewing changes to these files,
    which have been submitted as a follow up patch.
    
    Change-Id: I1916e17eaeb7854eb93c2415396f0ee0243e4e32
    Reviewed-on: http://gerrit.cloudera.org:8080/13298
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/transport/CMakeLists.txt     |   4 +-
 be/src/transport/THttpServer.cpp    | 164 ++++++++++++++++++++++
 be/src/transport/THttpServer.h      |  64 +++++++++
 be/src/transport/THttpTransport.cpp | 267 ++++++++++++++++++++++++++++++++++++
 be/src/transport/THttpTransport.h   | 104 ++++++++++++++
 5 files changed, 602 insertions(+), 1 deletion(-)

diff --git a/be/src/transport/CMakeLists.txt b/be/src/transport/CMakeLists.txt
index 8a9eda8..59fca8b 100644
--- a/be/src/transport/CMakeLists.txt
+++ b/be/src/transport/CMakeLists.txt
@@ -24,10 +24,12 @@ set(LIBRARY_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/transport")
 set(EXECUTABLE_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/transport")
 
 add_library(ThriftSaslTransport
+    THttpServer.cpp
+    THttpTransport.cpp
     TSaslClientTransport.cpp
     TSasl.cpp
     TSaslServerTransport.cpp
     TSaslTransport.cpp
     undef.cpp
   )
-add_dependencies(ThriftSaslTransport gen-deps)
\ No newline at end of file
+add_dependencies(ThriftSaslTransport gen-deps)
diff --git a/be/src/transport/THttpServer.cpp b/be/src/transport/THttpServer.cpp
new file mode 100644
index 0000000..a20d612
--- /dev/null
+++ b/be/src/transport/THttpServer.cpp
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <cstdlib>
+#include <sstream>
+#include <iostream>
+
+#include <thrift/transport/THttpServer.h>
+#include <thrift/transport/TSocket.h>
+#ifdef _MSC_VER
+#include <Shlwapi.h>
+#endif
+
+namespace apache {
+namespace thrift {
+namespace transport {
+
+using namespace std;
+
+THttpServer::THttpServer(boost::shared_ptr<TTransport> transport) : THttpTransport(transport) {
+}
+
+THttpServer::~THttpServer() {
+}
+
+#ifdef _MSC_VER
+  #define THRIFT_strncasecmp(str1, str2, len) _strnicmp(str1, str2, len)
+  #define THRIFT_strcasestr(haystack, needle) StrStrIA(haystack, needle)
+#else
+  #define THRIFT_strncasecmp(str1, str2, len) strncasecmp(str1, str2, len)
+  #define THRIFT_strcasestr(haystack, needle) strcasestr(haystack, needle)
+#endif
+
+void THttpServer::parseHeader(char* header) {
+  char* colon = strchr(header, ':');
+  if (colon == NULL) {
+    return;
+  }
+  size_t sz = colon - header;
+  char* value = colon + 1;
+
+  if (THRIFT_strncasecmp(header, "Transfer-Encoding", sz) == 0) {
+    if (THRIFT_strcasestr(value, "chunked") != NULL) {
+      chunked_ = true;
+    }
+  } else if (THRIFT_strncasecmp(header, "Content-length", sz) == 0) {
+    chunked_ = false;
+    contentLength_ = atoi(value);
+  } else if (strncmp(header, "X-Forwarded-For", sz) == 0) {
+    origin_ = value;
+  }
+}
+
+bool THttpServer::parseStatusLine(char* status) {
+  char* method = status;
+
+  char* path = strchr(method, ' ');
+  if (path == NULL) {
+    throw TTransportException(string("Bad Status: ") + status);
+  }
+
+  *path = '\0';
+  while (*(++path) == ' ') {
+  };
+
+  char* http = strchr(path, ' ');
+  if (http == NULL) {
+    throw TTransportException(string("Bad Status: ") + status);
+  }
+  *http = '\0';
+
+  if (strcmp(method, "POST") == 0) {
+    // POST method ok, looking for content.
+    return true;
+  } else if (strcmp(method, "OPTIONS") == 0) {
+    // preflight OPTIONS method, we don't need further content.
+    // how to graciously close connection?
+    uint8_t* buf;
+    uint32_t len;
+    writeBuffer_.getBuffer(&buf, &len);
+
+    // Construct the HTTP header
+    std::ostringstream h;
+    h << "HTTP/1.1 200 OK" << CRLF << "Date: " << getTimeRFC1123() << CRLF
+      << "Access-Control-Allow-Origin: *" << CRLF << "Access-Control-Allow-Methods: POST, OPTIONS"
+      << CRLF << "Access-Control-Allow-Headers: Content-Type" << CRLF << CRLF;
+    string header = h.str();
+
+    // Write the header, then the data, then flush
+    transport_->write((const uint8_t*)header.c_str(), static_cast<uint32_t>(header.size()));
+    transport_->write(buf, len);
+    transport_->flush();
+
+    // Reset the buffer and header variables
+    writeBuffer_.resetBuffer();
+    readHeaders_ = true;
+    return true;
+  }
+  throw TTransportException(string("Bad Status (unsupported method): ") + status);
+}
+
+void THttpServer::flush() {
+  // Fetch the contents of the write buffer
+  uint8_t* buf;
+  uint32_t len;
+  writeBuffer_.getBuffer(&buf, &len);
+
+  // Construct the HTTP header
+  std::ostringstream h;
+  h << "HTTP/1.1 200 OK" << CRLF << "Date: " << getTimeRFC1123() << CRLF << "Server: Thrift/"
+    << VERSION << CRLF << "Access-Control-Allow-Origin: *" << CRLF
+    << "Content-Type: application/x-thrift" << CRLF << "Content-Length: " << len << CRLF
+    << "Connection: Keep-Alive" << CRLF << CRLF;
+  string header = h.str();
+
+  // Write the header, then the data, then flush
+  // cast should be fine, because none of "header" is under attacker control
+  transport_->write((const uint8_t*)header.c_str(), static_cast<uint32_t>(header.size()));
+  transport_->write(buf, len);
+  transport_->flush();
+
+  // Reset the buffer and header variables
+  writeBuffer_.resetBuffer();
+  readHeaders_ = true;
+}
+
+std::string THttpServer::getTimeRFC1123() {
+  static const char* Days[] = {"Sun", "Mon", "Tue", "Wed", "Thu", "Fri", "Sat"};
+  static const char* Months[]
+      = {"Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec"};
+  char buff[128];
+  time_t t = time(NULL);
+  tm* broken_t = gmtime(&t);
+
+  sprintf(buff,
+          "%s, %d %s %d %d:%d:%d GMT",
+          Days[broken_t->tm_wday],
+          broken_t->tm_mday,
+          Months[broken_t->tm_mon],
+          broken_t->tm_year + 1900,
+          broken_t->tm_hour,
+          broken_t->tm_min,
+          broken_t->tm_sec);
+  return std::string(buff);
+}
+}
+}
+} // apache::thrift::transport
diff --git a/be/src/transport/THttpServer.h b/be/src/transport/THttpServer.h
new file mode 100644
index 0000000..a7ab944
--- /dev/null
+++ b/be/src/transport/THttpServer.h
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _THRIFT_TRANSPORT_THTTPSERVER_H_
+#define _THRIFT_TRANSPORT_THTTPSERVER_H_ 1
+
+#include <thrift/transport/THttpTransport.h>
+
+namespace apache {
+namespace thrift {
+namespace transport {
+
+class THttpServer : public THttpTransport {
+public:
+  THttpServer(boost::shared_ptr<TTransport> transport);
+
+  virtual ~THttpServer();
+
+  virtual void flush();
+
+protected:
+  void readHeaders();
+  virtual void parseHeader(char* header);
+  virtual bool parseStatusLine(char* status);
+  std::string getTimeRFC1123();
+};
+
+/**
+ * Wraps a transport into HTTP protocol
+ */
+class THttpServerTransportFactory : public TTransportFactory {
+public:
+  THttpServerTransportFactory() {}
+
+  virtual ~THttpServerTransportFactory() {}
+
+  /**
+   * Wraps the transport into a buffered one.
+   */
+  virtual boost::shared_ptr<TTransport> getTransport(boost::shared_ptr<TTransport> trans) {
+    return boost::shared_ptr<TTransport>(new THttpServer(trans));
+  }
+};
+}
+}
+} // apache::thrift::transport
+
+#endif // #ifndef _THRIFT_TRANSPORT_THTTPSERVER_H_
diff --git a/be/src/transport/THttpTransport.cpp b/be/src/transport/THttpTransport.cpp
new file mode 100644
index 0000000..a466ff6
--- /dev/null
+++ b/be/src/transport/THttpTransport.cpp
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <sstream>
+
+#include <thrift/transport/THttpTransport.h>
+
+namespace apache {
+namespace thrift {
+namespace transport {
+
+using namespace std;
+
+// Yeah, yeah, hacky to put these here, I know.
+const char* THttpTransport::CRLF = "\r\n";
+const int THttpTransport::CRLF_LEN = 2;
+
+THttpTransport::THttpTransport(boost::shared_ptr<TTransport> transport)
+  : transport_(transport),
+    origin_(""),
+    readHeaders_(true),
+    chunked_(false),
+    chunkedDone_(false),
+    chunkSize_(0),
+    contentLength_(0),
+    httpBuf_(NULL),
+    httpPos_(0),
+    httpBufLen_(0),
+    httpBufSize_(1024) {
+  init();
+}
+
+void THttpTransport::init() {
+  httpBuf_ = (char*)std::malloc(httpBufSize_ + 1);
+  if (httpBuf_ == NULL) {
+    throw std::bad_alloc();
+  }
+  httpBuf_[httpBufLen_] = '\0';
+}
+
+THttpTransport::~THttpTransport() {
+  if (httpBuf_ != NULL) {
+    std::free(httpBuf_);
+  }
+}
+
+uint32_t THttpTransport::read(uint8_t* buf, uint32_t len) {
+  if (readBuffer_.available_read() == 0) {
+    readBuffer_.resetBuffer();
+    uint32_t got = readMoreData();
+    if (got == 0) {
+      return 0;
+    }
+  }
+  return readBuffer_.read(buf, len);
+}
+
+uint32_t THttpTransport::readEnd() {
+  // Read any pending chunked data (footers etc.)
+  if (chunked_) {
+    while (!chunkedDone_) {
+      readChunked();
+    }
+  }
+  return 0;
+}
+
+uint32_t THttpTransport::readMoreData() {
+  uint32_t size;
+
+  // Get more data!
+  refill();
+
+  if (readHeaders_) {
+    readHeaders();
+  }
+
+  if (chunked_) {
+    size = readChunked();
+  } else {
+    size = readContent(contentLength_);
+    readHeaders_ = true;
+  }
+
+  return size;
+}
+
+uint32_t THttpTransport::readChunked() {
+  uint32_t length = 0;
+
+  char* line = readLine();
+  uint32_t chunkSize = parseChunkSize(line);
+  if (chunkSize == 0) {
+    readChunkedFooters();
+  } else {
+    // Read data content
+    length += readContent(chunkSize);
+    // Read trailing CRLF after content
+    readLine();
+  }
+  return length;
+}
+
+void THttpTransport::readChunkedFooters() {
+  // End of data, read footer lines until a blank one appears
+  while (true) {
+    char* line = readLine();
+    if (strlen(line) == 0) {
+      chunkedDone_ = true;
+      break;
+    }
+  }
+}
+
+uint32_t THttpTransport::parseChunkSize(char* line) {
+  char* semi = strchr(line, ';');
+  if (semi != NULL) {
+    *semi = '\0';
+  }
+  uint32_t size = 0;
+  sscanf(line, "%x", &size);
+  return size;
+}
+
+uint32_t THttpTransport::readContent(uint32_t size) {
+  uint32_t need = size;
+  while (need > 0) {
+    uint32_t avail = httpBufLen_ - httpPos_;
+    if (avail == 0) {
+      // We have given all the data, reset position to head of the buffer
+      httpPos_ = 0;
+      httpBufLen_ = 0;
+      refill();
+
+      // Now have available however much we read
+      avail = httpBufLen_;
+    }
+    uint32_t give = avail;
+    if (need < give) {
+      give = need;
+    }
+    readBuffer_.write((uint8_t*)(httpBuf_ + httpPos_), give);
+    httpPos_ += give;
+    need -= give;
+  }
+  return size;
+}
+
+char* THttpTransport::readLine() {
+  while (true) {
+    char* eol = NULL;
+
+    eol = strstr(httpBuf_ + httpPos_, CRLF);
+
+    // No CRLF yet?
+    if (eol == NULL) {
+      // Shift whatever we have now to front and refill
+      shift();
+      refill();
+    } else {
+      // Return pointer to next line
+      *eol = '\0';
+      char* line = httpBuf_ + httpPos_;
+      httpPos_ = static_cast<uint32_t>((eol - httpBuf_) + CRLF_LEN);
+      return line;
+    }
+  }
+}
+
+void THttpTransport::shift() {
+  if (httpBufLen_ > httpPos_) {
+    // Shift down remaining data and read more
+    uint32_t length = httpBufLen_ - httpPos_;
+    memmove(httpBuf_, httpBuf_ + httpPos_, length);
+    httpBufLen_ = length;
+  } else {
+    httpBufLen_ = 0;
+  }
+  httpPos_ = 0;
+  httpBuf_[httpBufLen_] = '\0';
+}
+
+void THttpTransport::refill() {
+  uint32_t avail = httpBufSize_ - httpBufLen_;
+  if (avail <= (httpBufSize_ / 4)) {
+    httpBufSize_ *= 2;
+    httpBuf_ = (char*)std::realloc(httpBuf_, httpBufSize_ + 1);
+    if (httpBuf_ == NULL) {
+      throw std::bad_alloc();
+    }
+  }
+
+  // Read more data
+  uint32_t got = transport_->read((uint8_t*)(httpBuf_ + httpBufLen_), httpBufSize_ - httpBufLen_);
+  httpBufLen_ += got;
+  httpBuf_[httpBufLen_] = '\0';
+
+  if (got == 0) {
+    throw TTransportException("Could not refill buffer");
+  }
+}
+
+void THttpTransport::readHeaders() {
+  // Initialize headers state variables
+  contentLength_ = 0;
+  chunked_ = false;
+  chunkedDone_ = false;
+  chunkSize_ = 0;
+
+  // Control state flow
+  bool statusLine = true;
+  bool finished = false;
+
+  // Loop until headers are finished
+  while (true) {
+    char* line = readLine();
+
+    if (strlen(line) == 0) {
+      if (finished) {
+        readHeaders_ = false;
+        return;
+      } else {
+        // Must have been an HTTP 100, keep going for another status line
+        statusLine = true;
+      }
+    } else {
+      if (statusLine) {
+        statusLine = false;
+        finished = parseStatusLine(line);
+      } else {
+        parseHeader(line);
+      }
+    }
+  }
+}
+
+void THttpTransport::write(const uint8_t* buf, uint32_t len) {
+  writeBuffer_.write(buf, len);
+}
+
+const std::string THttpTransport::getOrigin() {
+  std::ostringstream oss;
+  if (!origin_.empty()) {
+    oss << origin_ << ", ";
+  }
+  oss << transport_->getOrigin();
+  return oss.str();
+}
+}
+}
+}
diff --git a/be/src/transport/THttpTransport.h b/be/src/transport/THttpTransport.h
new file mode 100644
index 0000000..a9f564c
--- /dev/null
+++ b/be/src/transport/THttpTransport.h
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _THRIFT_TRANSPORT_THTTPTRANSPORT_H_
+#define _THRIFT_TRANSPORT_THTTPTRANSPORT_H_ 1
+
+#include <thrift/transport/TBufferTransports.h>
+#include <thrift/transport/TVirtualTransport.h>
+
+namespace apache {
+namespace thrift {
+namespace transport {
+
+/**
+ * HTTP implementation of the thrift transport. This was irritating
+ * to write, but the alternatives in C++ land are daunting. Linking CURL
+ * requires 23 dynamic libraries last time I checked (WTF?!?). All we have
+ * here is a VERY basic HTTP/1.1 client which supports HTTP 100 Continue,
+ * chunked transfer encoding, keepalive, etc. Tested against Apache.
+ */
+class THttpTransport : public TVirtualTransport<THttpTransport> {
+public:
+  THttpTransport(boost::shared_ptr<TTransport> transport);
+
+  virtual ~THttpTransport();
+
+  void open() { transport_->open(); }
+
+  bool isOpen() { return transport_->isOpen(); }
+
+  bool peek() { return transport_->peek(); }
+
+  void close() { transport_->close(); }
+
+  uint32_t read(uint8_t* buf, uint32_t len);
+
+  uint32_t readEnd();
+
+  void write(const uint8_t* buf, uint32_t len);
+
+  virtual void flush() = 0;
+
+  virtual const std::string getOrigin();
+
+protected:
+  boost::shared_ptr<TTransport> transport_;
+  std::string origin_;
+
+  TMemoryBuffer writeBuffer_;
+  TMemoryBuffer readBuffer_;
+
+  bool readHeaders_;
+  bool chunked_;
+  bool chunkedDone_;
+  uint32_t chunkSize_;
+  uint32_t contentLength_;
+
+  char* httpBuf_;
+  uint32_t httpPos_;
+  uint32_t httpBufLen_;
+  uint32_t httpBufSize_;
+
+  virtual void init();
+
+  uint32_t readMoreData();
+  char* readLine();
+
+  void readHeaders();
+  virtual void parseHeader(char* header) = 0;
+  virtual bool parseStatusLine(char* status) = 0;
+
+  uint32_t readChunked();
+  void readChunkedFooters();
+  uint32_t parseChunkSize(char* line);
+
+  uint32_t readContent(uint32_t size);
+
+  void refill();
+  void shift();
+
+  static const char* CRLF;
+  static const int CRLF_LEN;
+};
+}
+}
+} // apache::thrift::transport
+
+#endif // #ifndef _THRIFT_TRANSPORT_THTTPCLIENT_H_