You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafficserver.apache.org by jp...@apache.org on 2013/03/14 04:19:27 UTC

git commit: TS-1730: Supporting First Byte Flush for ESI plugin

Updated Branches:
  refs/heads/master cbfba1aeb -> 522ec6ccf


TS-1730: Supporting First Byte Flush for ESI plugin


Project: http://git-wip-us.apache.org/repos/asf/trafficserver/repo
Commit: http://git-wip-us.apache.org/repos/asf/trafficserver/commit/522ec6cc
Tree: http://git-wip-us.apache.org/repos/asf/trafficserver/tree/522ec6cc
Diff: http://git-wip-us.apache.org/repos/asf/trafficserver/diff/522ec6cc

Branch: refs/heads/master
Commit: 522ec6ccf3fdbdebc5b8a71474f752af5d37a1f0
Parents: cbfba1a
Author: Kit Chan <ch...@gmail.com>
Authored: Tue Feb 26 21:44:28 2013 -0800
Committer: James Peach <jp...@apache.org>
Committed: Wed Mar 13 19:52:15 2013 -0700

----------------------------------------------------------------------
 CHANGES                                      |    5 +-
 plugins/experimental/esi/Makefile.am         |    2 +
 plugins/experimental/esi/README              |    3 +-
 plugins/experimental/esi/lib/EsiGzip.cc      |  163 ++++++++++++++++
 plugins/experimental/esi/lib/EsiGzip.h       |   60 ++++++
 plugins/experimental/esi/lib/EsiProcessor.cc |  210 +++++++++++++++++++++
 plugins/experimental/esi/lib/EsiProcessor.h  |    9 +
 plugins/experimental/esi/plugin.cc           |  144 +++++++++++++--
 8 files changed, 581 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/trafficserver/blob/522ec6cc/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 111a9b0..c6c8c96 100644
--- a/CHANGES
+++ b/CHANGES
@@ -2,6 +2,9 @@
   Changes with Apache Traffic Server 3.3.2
 
 
+  *) [TS-1730] Supporting First Byte Flush for ESI plugin
+   Author: Shu Kit Chan <ch...@gmail.com>
+
   *) [TS-1745] fix typos
    Author: Benjamin Kerensa <bk...@ubuntu.com>
 
@@ -15,7 +18,7 @@
 
   *) [TS-1356] Ability to set thread affinity with multiple modes
 
-  
+
   Changes with Apache Traffic Server 3.3.1
 
   *) [TS-1743] Implement our own hash mechanism for traffic_logstats, since

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/522ec6cc/plugins/experimental/esi/Makefile.am
----------------------------------------------------------------------
diff --git a/plugins/experimental/esi/Makefile.am b/plugins/experimental/esi/Makefile.am
index 27c1c30..39e2af8 100644
--- a/plugins/experimental/esi/Makefile.am
+++ b/plugins/experimental/esi/Makefile.am
@@ -33,6 +33,7 @@ check_PROGRAMS = docnode_test parser_test processor_test utils_test vars_test
 libesi_la_SOURCES = \
 	lib/DocNode.cc \
 	lib/EsiParser.cc \
+	lib/EsiGzip.cc \
 	lib/EsiProcessor.cc \
 	lib/Expression.cc \
 	lib/FailureInfo.cc \
@@ -45,6 +46,7 @@ libesi_la_SOURCES = \
 libtest_la_SOURCES = \
 	lib/DocNode.cc \
 	lib/EsiParser.cc \
+	lib/EsiGzip.cc \
 	lib/EsiProcessor.cc \
 	lib/Expression.cc \
 	lib/FailureInfo.cc \

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/522ec6cc/plugins/experimental/esi/README
----------------------------------------------------------------------
diff --git a/plugins/experimental/esi/README b/plugins/experimental/esi/README
index 30ace69..abb1446 100644
--- a/plugins/experimental/esi/README
+++ b/plugins/experimental/esi/README
@@ -47,10 +47,11 @@ Enabling ESI
 
 esi.so
 
-There are three options you can add. 
+There are four options you can add. 
   "--private-response" will add private cache control and expires header to the processed ESI document. 
   "--packed-node-support" will enable the support for using packed node, which will improve the performance of parsing cached ESI document. 
   "--disable-gzip-output" will disable gzipped output, which will NOT gzip the output anyway.
+  "--first-byte-flush" will enable the first byte flush feature, which will flush content to users as soon as the entire ESI document is received and parsed without all ESI includes fetched (the flushing will stop at the ESI include markup till that include is fetched). 
 
 2) We need a mapping for origin server response that contains the ESI markup. Assume that the ATS server is abc.com. And your origin server is xyz.com and the response containing ESI markup is http://xyz.com/esi.php. We will need the following line in /usr/local/etc/trafficserver/remap.config
 

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/522ec6cc/plugins/experimental/esi/lib/EsiGzip.cc
----------------------------------------------------------------------
diff --git a/plugins/experimental/esi/lib/EsiGzip.cc b/plugins/experimental/esi/lib/EsiGzip.cc
new file mode 100644
index 0000000..312ed30
--- /dev/null
+++ b/plugins/experimental/esi/lib/EsiGzip.cc
@@ -0,0 +1,163 @@
+/** @file
+
+  A brief file description
+
+  @section license License
+
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+ */
+
+#include "EsiGzip.h"
+#include "gzip.h"
+#include <ctype.h>
+#include <stdint.h>
+
+using std::string;
+using namespace EsiLib;
+
+static const int COMPRESSION_LEVEL = 6;
+static const int ZLIB_MEM_LEVEL = 8;
+
+static const int GZIP_HEADER_SIZE = 10;
+static const int GZIP_TRAILER_SIZE = 8;
+
+static const char MAGIC_BYTE_1 = 0x1f;
+static const char MAGIC_BYTE_2 = 0x8b;
+static const char OS_TYPE = 3; // Unix
+
+static const int BUF_SIZE = 1 << 15; // 32k buffer
+
+EsiGzip::EsiGzip(const char *debug_tag,
+                           ComponentBase::Debug debug_func, ComponentBase::Error error_func)
+  : ComponentBase(debug_tag, debug_func, error_func),
+    _downstream_length(0), 
+    _total_data_length(0) {
+}
+
+template<typename T>
+inline void append(string &out, T data) {
+  for (unsigned int i = 0; i < sizeof(data); ++i) {
+    out += static_cast<char>(data & 0xff);
+    data = data >> 8;
+  }
+}
+
+inline int runDeflateLoop(z_stream &zstrm, int flush, std::string &cdata) {
+  char buf[BUF_SIZE];
+  int deflate_result = Z_OK;
+  do {
+    zstrm.next_out = reinterpret_cast<Bytef *>(buf);
+    zstrm.avail_out = BUF_SIZE;
+    deflate_result = deflate(&zstrm, flush);
+    if ((deflate_result == Z_OK) || (deflate_result == Z_STREAM_END)) {
+      cdata.append(buf, BUF_SIZE - zstrm.avail_out);
+      if ((deflate_result == Z_STREAM_END) || zstrm.avail_out > 6) {
+        break;
+      }
+    } else {
+      break;
+    }
+  } while (true);
+  return deflate_result;
+}
+
+bool
+EsiGzip::stream_encode(const char *data, int data_len, std::string &cdata) {
+  if(_downstream_length == 0) {
+    cdata.assign(GZIP_HEADER_SIZE, 0); // reserving space for the header
+    cdata[0] = MAGIC_BYTE_1;
+    cdata[1] = MAGIC_BYTE_2;
+    cdata[2] = Z_DEFLATED;
+    cdata[9] = OS_TYPE;
+
+    //_zstrm.zalloc = Z_NULL;
+    //_zstrm.zfree = Z_NULL;
+    //_zstrm.opaque = Z_NULL;
+    //if (deflateInit2(&_zstrm, COMPRESSION_LEVEL, Z_DEFLATED, -MAX_WBITS,
+    //               ZLIB_MEM_LEVEL, Z_DEFAULT_STRATEGY) != Z_OK) {
+    //  _errorLog("[%s] deflateInit2 failed!", __FUNCTION__);
+    //  return false;
+    //}
+
+    _crc = crc32(0, Z_NULL, 0);
+  }
+  
+    _zstrm.zalloc = Z_NULL;
+    _zstrm.zfree = Z_NULL;
+    _zstrm.opaque = Z_NULL;
+    if (deflateInit2(&_zstrm, COMPRESSION_LEVEL, Z_DEFLATED, -MAX_WBITS,
+                   ZLIB_MEM_LEVEL, Z_DEFAULT_STRATEGY) != Z_OK) {
+      _errorLog("[%s] deflateInit2 failed!", __FUNCTION__);
+      return false;
+    }
+
+  int deflate_result = Z_OK;
+  if (data && (data_len > 0)) {
+    _zstrm.next_in = reinterpret_cast<Bytef *>(const_cast<char *>(data));
+    _zstrm.avail_in = data_len;
+    deflate_result = runDeflateLoop(_zstrm, Z_FULL_FLUSH, cdata);
+    if (deflate_result != Z_OK) {
+      _errorLog("[%s] runDeflateLoop failed!", __FUNCTION__);
+
+      deflateEnd(&_zstrm);
+
+      return false;
+    }
+    _crc = crc32(_crc, reinterpret_cast<const Bytef *>(data), data_len);
+    _downstream_length += cdata.size();
+    _total_data_length += data_len;
+  }
+
+    deflateEnd(&_zstrm);    
+
+  return true;
+}
+
+bool EsiGzip::stream_finish(std::string &cdata, int&downstream_length) {
+  char buf[BUF_SIZE];
+
+    _zstrm.zalloc = Z_NULL;
+    _zstrm.zfree = Z_NULL;
+    _zstrm.opaque = Z_NULL;
+    if (deflateInit2(&_zstrm, COMPRESSION_LEVEL, Z_DEFLATED, -MAX_WBITS,
+                   ZLIB_MEM_LEVEL, Z_DEFAULT_STRATEGY) != Z_OK) {
+      _errorLog("[%s] deflateInit2 failed!", __FUNCTION__);
+      return false;
+    }
+
+  _zstrm.next_in = reinterpret_cast<Bytef *>(buf);
+  _zstrm.avail_in = 0; 
+  // required for the "finish" loop as no data has been given so far
+  int deflate_result = runDeflateLoop(_zstrm, Z_FINISH, cdata);
+  deflateEnd(&_zstrm);
+  if (deflate_result != Z_STREAM_END) {
+    _errorLog("[%s] deflateEnd failed!", __FUNCTION__);
+    downstream_length = 0;
+    return false;
+  }
+  append(cdata, static_cast<uint32_t>(_crc));
+  append(cdata, static_cast<int32_t>(_total_data_length));
+  _downstream_length += cdata.size();
+  downstream_length = _downstream_length;
+  return true;
+}
+
+EsiGzip::~EsiGzip() {
+  _downstream_length = 0;
+  _total_data_length = 0;
+}
+

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/522ec6cc/plugins/experimental/esi/lib/EsiGzip.h
----------------------------------------------------------------------
diff --git a/plugins/experimental/esi/lib/EsiGzip.h b/plugins/experimental/esi/lib/EsiGzip.h
new file mode 100644
index 0000000..1032d9e
--- /dev/null
+++ b/plugins/experimental/esi/lib/EsiGzip.h
@@ -0,0 +1,60 @@
+/** @file
+
+  A brief file description
+
+  @section license License
+
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+ */
+
+#ifndef _ESI_GZIP_H
+#define _ESI_GZIP_H
+
+#include "ComponentBase.h"
+#include <zlib.h>
+#include <string>
+
+class EsiGzip : private EsiLib::ComponentBase
+{
+
+public:
+
+  EsiGzip(const char *debug_tag,
+               EsiLib::ComponentBase::Debug debug_func, EsiLib::ComponentBase::Error error_func);
+
+  virtual ~EsiGzip();
+
+  bool stream_encode(const char *data, int data_len, std::string &cdata);
+
+  inline bool stream_encode(std::string data, std::string &cdata) {
+    return stream_encode(data.data(), data.size(), cdata);
+  }
+
+  bool stream_finish(std::string &cdata, int&downstream_length);
+
+private:
+
+  //int runDeflateLoop(z_stream &zstrm, int flush, std::string &cdata);
+  int _downstream_length;
+  int _total_data_length;
+  z_stream _zstrm;
+  uLong _crc;
+};
+
+
+#endif // _ESI_GZIP_H
+

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/522ec6cc/plugins/experimental/esi/lib/EsiProcessor.cc
----------------------------------------------------------------------
diff --git a/plugins/experimental/esi/lib/EsiProcessor.cc b/plugins/experimental/esi/lib/EsiProcessor.cc
index 15a3e87..ed4868b 100644
--- a/plugins/experimental/esi/lib/EsiProcessor.cc
+++ b/plugins/experimental/esi/lib/EsiProcessor.cc
@@ -44,6 +44,9 @@ EsiProcessor::EsiProcessor(const char *debug_tag, const char *parser_debug_tag,
     _curr_state(STOPPED),
     _parser(parser_debug_tag, debug_func, error_func),
     _n_prescanned_nodes(0),
+    _n_processed_nodes(0),
+    _n_processed_try_nodes(0),
+    _overall_len(0),
     _fetcher(fetcher), _esi_vars(variables),
     _expression(expression_debug_tag, debug_func, error_func, _esi_vars), _n_try_blocks_processed(0),
     _handler_manager(handler_mgr) {
@@ -145,6 +148,49 @@ EsiProcessor::_handleParseComplete() {
   return true;
 }
 
+DataStatus
+EsiProcessor::_getIncludeStatus(const DocNode &node) {
+  _debugLog(_debug_tag, "[%s] inside getIncludeStatus", __FUNCTION__);
+  if (node.type == DocNode::TYPE_INCLUDE) {
+    const Attribute &url = node.attr_list.front();
+
+    if(url.value_len == 0) { //allow empty url
+      return STATUS_DATA_AVAILABLE;
+    }
+
+    string raw_url(url.value, url.value_len);
+    StringHash::iterator iter = _include_urls.find(raw_url);
+    if (iter == _include_urls.end()) {
+      _errorLog("[%s] Data not requested for URL [%.*s]; no data to include",
+                __FUNCTION__, url.value_len, url.value);
+      return STATUS_ERROR;
+    }
+    const string &processed_url = iter->second;
+    DataStatus status = _fetcher.getRequestStatus(processed_url);
+    _debugLog(_debug_tag, "[%s] Got status %d successfully for URL [%.*s]", __FUNCTION__, status, 
+              processed_url.size(), processed_url.data());
+    return status;
+  } else if (node.type == DocNode::TYPE_SPECIAL_INCLUDE) {
+    AttributeList::const_iterator attr_iter;
+    for (attr_iter = node.attr_list.begin(); attr_iter != node.attr_list.end(); ++attr_iter) {
+      if (attr_iter->name == INCLUDE_DATA_ID_ATTR) {
+        break;
+      }
+    }
+    int include_data_id = attr_iter->value_len;
+    SpecialIncludeHandler *handler = 
+      reinterpret_cast<SpecialIncludeHandler *>(const_cast<char *>(attr_iter->value));
+    DataStatus status = handler->getIncludeStatus(include_data_id);
+    _debugLog(_debug_tag, "[%s] Successfully got status for special include with id %d",
+              __FUNCTION__, status, include_data_id);
+
+    return status;
+  }
+  _debugLog(_debug_tag, "[%s] node of type %s",
+            __FUNCTION__, DocNode::type_names_[node.type]);
+  return STATUS_DATA_AVAILABLE;
+}
+
 bool
 EsiProcessor::_getIncludeData(const DocNode &node, const char **content_ptr /* = 0 */,
                               int *content_len_ptr /* = 0 */) {
@@ -330,6 +376,169 @@ EsiProcessor::process(const char *&data, int &data_len) {
   return SUCCESS;
 }
 
+EsiProcessor::ReturnCode
+EsiProcessor::flush(string &data, int &overall_len) {
+
+  if (_curr_state == ERRORED) {
+    return FAILURE;
+  }
+  if(_curr_state == PROCESSED) {
+    overall_len = _overall_len;
+    data.assign("");
+    return SUCCESS;
+  }
+  if (_curr_state != WAITING_TO_PROCESS) {
+    _errorLog("[%s] Processor has to finish parsing via completeParse() before process() call", __FUNCTION__);
+    return FAILURE;
+  }
+  DocNodeList::iterator node_iter,iter;
+  bool attempt_succeeded;
+  bool attempt_pending;
+  bool node_pending;
+  std::vector<std::string> attemptUrls;
+  _output_data.clear();
+  TryBlockList::iterator try_iter = _try_blocks.begin();
+  for (int i = 0; i < _n_try_blocks_processed; ++i, ++try_iter);
+  for (; _n_try_blocks_processed < static_cast<int>(_try_blocks.size()); ++try_iter) {
+    attempt_pending=false;
+    for(node_iter=try_iter->attempt_nodes.begin(); node_iter!=try_iter->attempt_nodes.end(); ++node_iter) {
+      if((node_iter->type == DocNode::TYPE_INCLUDE) ||
+         (node_iter->type == DocNode::TYPE_SPECIAL_INCLUDE)) {
+        if(_getIncludeStatus(*node_iter) == STATUS_DATA_PENDING) {
+          attempt_pending = true;
+          break;
+        }
+      }
+    }
+    if(attempt_pending) {
+      break;
+    }
+  
+    ++_n_try_blocks_processed;
+    attempt_succeeded = true;
+    for (node_iter = try_iter->attempt_nodes.begin(); node_iter != try_iter->attempt_nodes.end(); ++node_iter) {
+      if ((node_iter->type == DocNode::TYPE_INCLUDE) ||
+          (node_iter->type == DocNode::TYPE_SPECIAL_INCLUDE)) {
+          const Attribute &url = (*node_iter).attr_list.front();              
+          string raw_url(url.value, url.value_len);
+          attemptUrls.push_back(_expression.expand(raw_url));
+        if (!_getIncludeStatus(*node_iter) == STATUS_ERROR) {
+          attempt_succeeded = false;
+          _errorLog("[%s] attempt section errored; due to url [%s]", __FUNCTION__, raw_url.c_str());
+          break;
+        }
+      }
+    }
+          
+    /* FAILURE CACHE */
+    FailureData* fdata=static_cast<FailureData*>(pthread_getspecific(threadKey));
+    _debugLog("plugin_esi_failureInfo","[%s]Fetched data related to thread specfic %p",__FUNCTION__,fdata);
+    
+    for (iter=try_iter->attempt_nodes.begin(); iter != try_iter->attempt_nodes.end(); ++iter) {
+      if ((iter->type == DocNode::TYPE_INCLUDE) || iter->type == DocNode::TYPE_SPECIAL_INCLUDE)
+      {
+          if(!attempt_succeeded && iter==node_iter)
+              continue;
+          const Attribute &url = (*iter).attr_list.front();              
+          string raw_url(url.value, url.value_len);
+          attemptUrls.push_back(_expression.expand(raw_url));
+      }
+    }
+   
+    if(attemptUrls.size()>0 && fdata)
+    { 
+        FailureData::iterator it =fdata->find(attemptUrls[0]);
+        FailureInfo* info;
+    
+        if(it == fdata->end())
+        {
+            _debugLog("plugin_esi_failureInfo","[%s]Inserting object for the attempt URLS",__FUNCTION__);
+            info=new FailureInfo(FAILURE_INFO_TAG,_debugLog,_errorLog);
+            for(int i=0;i<static_cast<int>(attemptUrls.size());i++)
+            {
+                _debugLog("plugin_esi_failureInfo", "[%s] Urls [%.*s]",__FUNCTION__,attemptUrls[i].size(),attemptUrls[i].data());
+                (*fdata)[attemptUrls[i]]=info;
+            }
+    
+            info->registerSuccFail(attempt_succeeded);
+
+        } else {
+            info=it->second;
+            //Should be registered only if attemp was made
+            //and it failed
+            if(_reqAdded)
+                info->registerSuccFail(attempt_succeeded);   
+    
+        }
+    }
+    if (attempt_succeeded) {
+      _debugLog(_debug_tag, "[%s] attempt section succeded; using attempt section", __FUNCTION__);
+      _node_list.splice(try_iter->pos, try_iter->attempt_nodes);
+    } else {
+      _debugLog(_debug_tag, "[%s] attempt section errored; trying except section", __FUNCTION__); 
+      int n_prescanned_nodes = 0;
+      if (!_preprocess(try_iter->except_nodes, n_prescanned_nodes)) {
+        _errorLog("[%s] Failed to preprocess except nodes", __FUNCTION__);
+      }
+      _node_list.splice(try_iter->pos, try_iter->except_nodes);
+      if (_fetcher.getNumPendingRequests()) { 
+        _debugLog(_debug_tag, "[%s] New fetch requests were triggered by except block; "
+                  "Returning NEED_MORE_DATA...", __FUNCTION__);
+      }
+    }
+  }
+
+  node_pending = false;
+  node_iter=_node_list.begin();
+  for(int i = 0;i< _n_processed_nodes;++i,++node_iter);
+  for(;node_iter!= _node_list.end(); ++node_iter) {
+    DocNode &doc_node = *node_iter; // handy reference
+    _debugLog(_debug_tag, "[%s] Processing ESI node [%s] with data of size %d starting with [%.10s...]",
+              __FUNCTION__, DocNode::type_names_[doc_node.type], doc_node.data_len,
+              (doc_node.data_len ? doc_node.data : "(null)"));
+
+    if(_getIncludeStatus(doc_node) == STATUS_DATA_PENDING) {
+      node_pending = true;
+      break;
+    }
+
+    _debugLog(_debug_tag, "[%s] processed node: %d, try blocks processed: %d, processed try nodes: %d", __FUNCTION__, _n_processed_nodes, _n_try_blocks_processed, _n_processed_try_nodes);
+    if(doc_node.type == DocNode::TYPE_TRY) {
+      if(_n_try_blocks_processed <= _n_processed_try_nodes) {
+        node_pending = true;
+        break;
+      } else {
+        ++_n_processed_try_nodes;
+      }
+    }
+   
+    _debugLog(_debug_tag, "[%s] really Processing ESI node [%s] with data of size %d starting with [%.10s...]", __FUNCTION__, DocNode::type_names_[doc_node.type], doc_node.data_len, (doc_node.data_len ? doc_node.data: "(null)"));
+
+    if (doc_node.type == DocNode::TYPE_PRE) {
+      // just copy the data
+      _output_data.append(doc_node.data, doc_node.data_len);
+      ++_n_processed_nodes;
+    } else if (!_processEsiNode(node_iter)) {
+      _errorLog("[%s] Failed to process ESI node [%.*s]", __FUNCTION__, doc_node.data_len, doc_node.data);
+      ++_n_processed_nodes;
+    } else {
+      ++_n_processed_nodes;
+    }
+  }
+
+  if(!node_pending) {
+    _curr_state = PROCESSED;
+    _addFooterData();
+  }
+  data.assign(_output_data);
+  _overall_len = _overall_len + data.size();
+  overall_len = _overall_len;
+
+  _debugLog(_debug_tag, "[%s] ESI processed document of size %d starting with [%.10s]",
+            __FUNCTION__, data.size(), (data.size() ? data.data() : "(null)"));
+  return SUCCESS;
+}
+
 void
 EsiProcessor::stop() {
   _output_data.clear();
@@ -338,6 +547,7 @@ EsiProcessor::stop() {
   _try_blocks.clear();
   _n_prescanned_nodes = 0;
   _n_try_blocks_processed = 0;
+  _overall_len = 0;
   for (IncludeHandlerMap::iterator map_iter = _include_handlers.begin();
        map_iter != _include_handlers.end(); ++map_iter) {
     delete map_iter->second;

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/522ec6cc/plugins/experimental/esi/lib/EsiProcessor.h
----------------------------------------------------------------------
diff --git a/plugins/experimental/esi/lib/EsiProcessor.h b/plugins/experimental/esi/lib/EsiProcessor.h
index eeae62a..b4f7309 100644
--- a/plugins/experimental/esi/lib/EsiProcessor.h
+++ b/plugins/experimental/esi/lib/EsiProcessor.h
@@ -80,6 +80,11 @@ public:
    * else FAILURE/SUCCESS is returned. */
   ReturnCode process(const char *&data, int &data_len);
 
+  /** Process the ESI document and flush processed data as much as 
+   * possible. Can be called when fetcher hasn't finished pulling 
+   * in all data. */
+  ReturnCode flush(std:: string &data, int &overall_len);
+ 
   /** returns packed version of document currently being processed */
   void packNodeList(std::string &buffer, bool retain_buffer_data) {
     return _node_list.pack(buffer, retain_buffer_data);
@@ -110,6 +115,9 @@ private:
   EsiParser _parser;
   EsiLib::DocNodeList _node_list;
   int _n_prescanned_nodes;
+  int _n_processed_nodes;
+  int _n_processed_try_nodes;
+  int _overall_len;
 
   HttpDataFetcher &_fetcher;
   EsiLib::StringHash _include_urls;
@@ -119,6 +127,7 @@ private:
   bool _processEsiNode(const EsiLib::DocNodeList::iterator &iter);
   bool _handleParseComplete();
   bool _getIncludeData(const EsiLib::DocNode &node, const char **content_ptr = 0, int *content_len_ptr = 0);
+  DataStatus _getIncludeStatus(const EsiLib::DocNode &node);
   bool _handleVars(const char *str, int str_len);
   bool _handleChoose(EsiLib::DocNodeList::iterator &curr_node);
   bool _handleTry(EsiLib::DocNodeList::iterator &curr_node);

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/522ec6cc/plugins/experimental/esi/plugin.cc
----------------------------------------------------------------------
diff --git a/plugins/experimental/esi/plugin.cc b/plugins/experimental/esi/plugin.cc
index 4548776..e7fd960 100644
--- a/plugins/experimental/esi/plugin.cc
+++ b/plugins/experimental/esi/plugin.cc
@@ -40,6 +40,7 @@
 
 #include "lib/Utils.h"
 #include "lib/gzip.h"
+#include "EsiGzip.h"
 #include "EsiProcessor.h"
 #include "HttpDataFetcher.h"
 #include "HandlerManager.h"
@@ -57,12 +58,14 @@ struct OptionInfo
   bool packed_node_support;
   bool private_response;
   bool disable_gzip_output;
+  bool first_byte_flush;
 };
 
 static HandlerManager *gHandlerManager = NULL;
 
 #define DEBUG_TAG "plugin_esi"
 #define PROCESSOR_DEBUG_TAG "plugin_esi_processor"
+#define GZIP_DEBUG_TAG "plugin_esi_gzip"
 #define PARSER_DEBUG_TAG "plugin_esi_parser"
 #define FETCHER_DEBUG_TAG "plugin_esi_fetcher"
 #define VARS_DEBUG_TAG "plugin_esi_vars"
@@ -91,11 +94,13 @@ struct ContData
   STATE curr_state;
   TSVIO input_vio;
   TSIOBufferReader input_reader;
+  TSVIO output_vio;
   TSIOBuffer output_buffer;
   TSIOBufferReader output_reader;
   Variables *esi_vars;
   HttpDataFetcherImpl *data_fetcher;
   EsiProcessor *esi_proc;
+  EsiGzip *esi_gzip;
   TSCont contp;
   TSHttpTxn txnp;
   const struct OptionInfo *option_info;
@@ -116,9 +121,9 @@ struct ContData
   list<string> post_headers;
 
   ContData(TSCont contptr, TSHttpTxn tx)
-    : curr_state(READING_ESI_DOC), input_vio(NULL),
+    : curr_state(READING_ESI_DOC), input_vio(NULL), output_vio(NULL),
       output_buffer(NULL), output_reader(NULL),
-      esi_vars(NULL), data_fetcher(NULL), esi_proc(NULL),
+      esi_vars(NULL), data_fetcher(NULL), esi_proc(NULL), esi_gzip(NULL), 
       contp(contptr), txnp(tx), request_url(NULL),
       input_type(DATA_TYPE_RAW_ESI), packed_node_list(""),
       gzipped_data(""), gzip_output(false),
@@ -212,10 +217,20 @@ ContData::init()
     }
     input_reader = TSVIOReaderGet(input_vio);
 
+    // get downstream VIO
+    TSVConn output_conn;
+    output_conn = TSTransformOutputVConnGet(contp);
+    if(!output_conn) {
+      TSError("[%s] Error while getting transform VC", __FUNCTION__);
+      goto lReturn;
+    }
     output_buffer = TSIOBufferCreate();
     output_reader = TSIOBufferReaderAlloc(output_buffer);
 
-    string fetcher_tag, vars_tag, expr_tag, proc_tag;
+    // we don't know how much data we are going to write, so INT_MAX
+    output_vio = TSVConnWrite(output_conn, contp, output_reader, INT64_MAX);
+
+    string fetcher_tag, vars_tag, expr_tag, proc_tag, gzip_tag;
     if (!data_fetcher) {
       data_fetcher = new HttpDataFetcherImpl(contp, client_addr,
                                              createDebugTag(FETCHER_DEBUG_TAG, contp, fetcher_tag));
@@ -228,6 +243,9 @@ ContData::init()
                                 createDebugTag(PARSER_DEBUG_TAG, contp, fetcher_tag),
                                 createDebugTag(EXPR_DEBUG_TAG, contp, expr_tag),
                                 &TSDebug, &TSError, *data_fetcher, *esi_vars, *gHandlerManager);
+    
+    esi_gzip = new EsiGzip(createDebugTag(GZIP_DEBUG_TAG, contp, gzip_tag), &TSDebug, &TSError);
+
     TSDebug(debug_tag, "[%s] Set input data type to [%s]", __FUNCTION__,
              DATA_TYPE_NAMES_[input_type]);
 
@@ -471,6 +489,9 @@ ContData::~ContData()
   if (esi_proc) {
     delete esi_proc;
   }
+  if(esi_gzip) {
+    delete esi_gzip;
+  }
 }
 
 static int removeCacheHandler(TSCont contp, TSEvent event, void *edata) {
@@ -610,7 +631,9 @@ transformData(TSCont contp)
       if (!cont_data->data_fetcher->isFetchComplete()) {
         TSDebug(cont_data->debug_tag,
                  "[%s] input_vio NULL, but data needs to be fetched. Returning control", __FUNCTION__);
-        return 1;
+        if(!cont_data->option_info->first_byte_flush) {
+          return 1;
+        }
       } else {
         TSDebug(cont_data->debug_tag,
                  "[%s] input_vio NULL, but processing needs to (and can) be completed", __FUNCTION__);
@@ -714,7 +737,8 @@ transformData(TSCont contp)
     }
   }
 
-  if (cont_data->curr_state == ContData::FETCHING_DATA) { // retest as state may have changed in previous block
+  if ((cont_data->curr_state == ContData::FETCHING_DATA) &&
+      (!cont_data->option_info->first_byte_flush)) { // retest as state may have changed in previous block
     if (cont_data->data_fetcher->isFetchComplete()) {
       TSDebug(cont_data->debug_tag, "[%s] data ready; going to process doc", __FUNCTION__);
       const char *out_data;
@@ -780,6 +804,88 @@ transformData(TSCont contp)
     }
   }
 
+  if ((cont_data->curr_state == ContData::FETCHING_DATA) &&
+      (cont_data->option_info->first_byte_flush)) { // retest as state may have changed in previous block
+    TSDebug(cont_data->debug_tag, "[%s] trying to process doc", __FUNCTION__);
+    string out_data;
+    string cdata;
+    int overall_len;
+    EsiProcessor::ReturnCode retval = cont_data->esi_proc->flush(out_data, overall_len);
+
+    if (cont_data->data_fetcher->isFetchComplete()) {
+      TSDebug(cont_data->debug_tag, "[%s] data ready; last process() will have finished the entire processing", __FUNCTION__);
+      cont_data->curr_state = ContData::PROCESSING_COMPLETE;
+    }
+
+    if (retval == EsiProcessor::SUCCESS) {
+      TSDebug(cont_data->debug_tag,
+                 "[%s] ESI processor output document of size %d starting with [%.10s]",
+                 __FUNCTION__, (int) out_data.size(), (out_data.size() ? out_data.data() : "(null)"));
+    } else {
+      TSError("[%s] ESI processor failed to process document; will return empty document", __FUNCTION__);
+      out_data.assign("");
+        
+      if(!cont_data->xform_closed) {
+        TSVIONBytesSet(cont_data->output_vio, 0);
+        TSVIOReenable(cont_data->output_vio);
+      }
+    }
+
+    // make sure transformation has not been prematurely terminated
+    if (!cont_data->xform_closed && out_data.size() > 0) {
+      if (cont_data->gzip_output) {
+        if (!cont_data->esi_gzip->stream_encode(out_data, cdata)) {
+          TSError("[%s] Error while gzipping content", __FUNCTION__);
+        } else {
+          TSDebug(cont_data->debug_tag, "[%s] Compressed document from size %d to %d bytes",
+                     __FUNCTION__, (int) out_data.size(), (int) cdata.size());
+        }
+      }
+
+      if (cont_data->gzip_output) {
+        if (TSIOBufferWrite(TSVIOBufferGet(cont_data->output_vio), cdata.data(), cdata.size()) == TS_ERROR) {
+          TSError("[%s] Error while writing bytes to downstream VC", __FUNCTION__);
+          return 0;
+        }
+      } else {
+        if (TSIOBufferWrite(TSVIOBufferGet(cont_data->output_vio), out_data.data(), out_data.size()) == TS_ERROR) {
+          TSError("[%s] Error while writing bytes to downstream VC", __FUNCTION__);
+          return 0;
+        }
+      }
+    }
+    if(!cont_data->xform_closed) {     
+      // should not set any fixed length
+      if(cont_data->curr_state == ContData::PROCESSING_COMPLETE) {
+        if(cont_data->gzip_output) {
+          string cdata;
+          int downstream_length;
+          if(!cont_data->esi_gzip->stream_finish(cdata, downstream_length)) {
+            TSError("[%s] Error while finishing gzip", __FUNCTION__);
+            return 0;  
+          } else {   
+            if (TSIOBufferWrite(TSVIOBufferGet(cont_data->output_vio), cdata.data(), cdata.size()) == TS_ERROR) {
+              TSError("[%s] Error while writing bytes to downstream VC", __FUNCTION__);
+              return 0;
+            }
+            TSDebug(cont_data->debug_tag,
+                 "[%s] ESI processed overall/gzip: %d",
+                 __FUNCTION__, downstream_length );
+            TSVIONBytesSet(cont_data->output_vio, downstream_length);          
+          }
+        } else {
+          TSDebug(cont_data->debug_tag,
+                 "[%s] ESI processed overall: %d",
+                 __FUNCTION__, overall_len );
+          TSVIONBytesSet(cont_data->output_vio, overall_len);
+        }
+      } 
+   
+      // Reenable the output connection so it can read the data we've produced.
+      TSVIOReenable(cont_data->output_vio);
+    }
+  }
+
   return 1;
 }
 
@@ -859,8 +965,14 @@ transformHandler(TSCont contp, TSEvent event, void *edata)
       transformData(contp);
       break;
 
+    case TS_EVENT_VCONN_WRITE_READY:
+      TSDebug(cont_debug_tag, "[%s] WRITE_READY", __FUNCTION__);
+      if(!cont_data->option_info->first_byte_flush) {
+        TSVConnShutdown(TSTransformOutputVConnGet(contp), 0, 1);
+      }
+      break;
+
     case TS_EVENT_VCONN_WRITE_COMPLETE:
-    case TS_EVENT_VCONN_WRITE_READY:     // we write only once to downstream VC
       TSDebug(cont_debug_tag, "[%s] shutting down transformation", __FUNCTION__);
       TSVConnShutdown(TSTransformOutputVConnGet(contp), 0, 1);
       break;
@@ -874,13 +986,15 @@ transformHandler(TSCont contp, TSEvent event, void *edata)
       if (is_fetch_event) {
         TSDebug(cont_debug_tag, "[%s] Handling fetch event %d...", __FUNCTION__, event);
         if (cont_data->data_fetcher->handleFetchEvent(event, edata)) {
-          if ((cont_data->curr_state == ContData::FETCHING_DATA) &&
-              cont_data->data_fetcher->isFetchComplete()) {
+          if (cont_data->curr_state == ContData::FETCHING_DATA) {
             // there's a small chance that fetcher is ready even before
             // parsing is complete; hence we need to check the state too
-            TSDebug(cont_debug_tag, "[%s] fetcher is ready with data, going into process stage",
+            if(cont_data->option_info->first_byte_flush ||
+               cont_data->data_fetcher->isFetchComplete()){
+              TSDebug(cont_debug_tag, "[%s] fetcher is ready with data, going into process stage",
                      __FUNCTION__);
-            transformData(contp);
+              transformData(contp);
+            }
           }
         } else {
           TSError("[%s] Could not handle fetch event!", __FUNCTION__);
@@ -1516,6 +1630,7 @@ static int esiPluginInit(int argc, const char *argv[], struct OptionInfo *pOptio
       { const_cast<char *>("packed-node-support"), no_argument, NULL, 'n' },
       { const_cast<char *>("private-response"), no_argument, NULL, 'p' },
       { const_cast<char *>("disable-gzip-output"), no_argument, NULL, 'z' },
+      { const_cast<char *>("first-byte-flush"), no_argument, NULL, 'b' },
       { const_cast<char *>("handler-filename"), required_argument, NULL, 'f' },
       { NULL, 0, NULL, 0 }
     };
@@ -1523,7 +1638,7 @@ static int esiPluginInit(int argc, const char *argv[], struct OptionInfo *pOptio
     optarg = NULL;
     optind = opterr = optopt = 0;
     int longindex = 0;
-    while ((c = getopt_long(argc, (char * const*) argv, "npzf:", longopts, &longindex)) != -1) {
+    while ((c = getopt_long(argc, (char * const*) argv, "npzbf:", longopts, &longindex)) != -1) {
       switch (c) {
         case 'n':
           pOptionInfo->packed_node_support = true;
@@ -1534,6 +1649,9 @@ static int esiPluginInit(int argc, const char *argv[], struct OptionInfo *pOptio
         case 'z':
           pOptionInfo->disable_gzip_output = true;
           break;
+        case 'b':
+          pOptionInfo->first_byte_flush = true;
+          break;
         case 'f':
           {
             Utils::KeyValueMap handler_conf;
@@ -1563,9 +1681,9 @@ static int esiPluginInit(int argc, const char *argv[], struct OptionInfo *pOptio
   if (result == 0) {
     TSDebug(DEBUG_TAG, "[%s] Plugin started%s, " \
         "packed-node-support: %d, private-response: %d, " \
-        "disable-gzip-output: %d", __FUNCTION__, bKeySet ? " and key is set" : "",
+        "disable-gzip-output: %d, first-byte-flush: %d ", __FUNCTION__, bKeySet ? " and key is set" : "",
         pOptionInfo->packed_node_support, pOptionInfo->private_response,
-        pOptionInfo->disable_gzip_output);
+        pOptionInfo->disable_gzip_output, pOptionInfo->first_byte_flush);
   }
 
   return result;