You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2010/06/11 01:51:24 UTC

svn commit: r953503 - in /couchdb/branches/new_replicator: ./ src/couchdb/Makefile.am src/couchdb/couch_api_wrap.erl src/couchdb/couch_httpd_rep.erl src/couchdb/couch_replicate.erl src/couchdb/couch_replicate.hrl src/couchdb/json_stream_parse.erl

Author: damien
Date: Thu Jun 10 23:51:24 2010
New Revision: 953503

URL: http://svn.apache.org/viewvc?rev=953503&view=rev
Log:
Initial check in of replicator that incrementally replicates attachments, so that only attachments that have changed are replicated, and streams attachments, so that buffering for push and pull isn't required. To enable the new replicator, add this to [httpd_global_handlers] in the local.ini file \n_replicate = {couch_httpd_rep, handle_req}\n. This isn't robust to dela with common failures, doesn't yet create replica target database when missing, replicate docs by id, or do continuous replication.

Added:
    couchdb/branches/new_replicator/
      - copied from r953500, couchdb/trunk/
    couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl
    couchdb/branches/new_replicator/src/couchdb/couch_httpd_rep.erl
    couchdb/branches/new_replicator/src/couchdb/couch_replicate.erl
    couchdb/branches/new_replicator/src/couchdb/couch_replicate.hrl
    couchdb/branches/new_replicator/src/couchdb/json_stream_parse.erl
Modified:
    couchdb/branches/new_replicator/src/couchdb/Makefile.am

Modified: couchdb/branches/new_replicator/src/couchdb/Makefile.am
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/Makefile.am?rev=953503&r1=953500&r2=953503&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/Makefile.am (original)
+++ couchdb/branches/new_replicator/src/couchdb/Makefile.am Thu Jun 10 23:51:24 2010
@@ -78,7 +78,12 @@ source_files = \
     couch_view_updater.erl \
     couch_view_group.erl \
     couch_db_updater.erl \
-    couch_work_queue.erl
+    couch_work_queue.erl \
+    couch_replicate.erl \
+    couch_httpd_rep.erl \
+    couch_api_wrap.erl \
+    json_stream_parse.erl
+
 
 EXTRA_DIST = $(source_files) couch_db.hrl
 
@@ -135,7 +140,11 @@ compiled_files = \
     couch_view_updater.beam \
     couch_view_group.beam \
     couch_db_updater.beam \
-    couch_work_queue.beam
+    couch_work_queue.beam \
+    couch_replicate.beam \
+    couch_httpd_rep.beam \
+    couch_api_wrap.beam \
+    json_stream_parse.beam
 
 # doc_base = \
 #     erlang.png \

Added: couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl?rev=953503&view=auto
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl (added)
+++ couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl Thu Jun 10 23:51:24 2010
@@ -0,0 +1,386 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_api_wrap).
+
+
+-include("couch_db.hrl").
+-include("couch_replicate.hrl").
+-include("../ibrowse/ibrowse.hrl").
+
+-export([
+    db_open/2,
+    get_db_info/1,
+    open_doc/3,
+    update_doc/3,
+    ensure_full_commit/1,
+    get_missing_revs/2,
+    open_doc_revs/6,
+    update_doc/4,
+    changes_since/5
+    ]).
+
+db_open(#httpdb{}=Db, _Options) ->
+    {ok, Db};
+db_open(DbName, Options) ->
+    couch_db:open(DbName,Options).
+
+get_db_info(#httpdb{url=Url,oauth=OAuth,headers=Headers}) ->
+    Headers2 = oauth_header(Url, [], get, OAuth) ++ Headers,
+    case ibrowse:send_req(Url, Headers2, get, [], [ 
+           {response_format,binary}
+           ], infinity) of
+    {ok, "200", _RespHeaders, Body} ->
+        {Props} = ?JSON_DECODE(Body),
+       {ok, [{couch_util:to_existing_atom(K), V} || {K,V} <- Props]}
+    end;
+get_db_info(Db) ->
+    couch_db:get_db_info(Db).
+
+
+open_doc(#httpdb{url=Url,oauth=OAuth,headers=Headers}, DocId, Options) ->
+    Url2 = Url ++ couch_util:url_encode(DocId),
+    QArgs = options_to_query_args(Options, []),
+    Headers2 = oauth_header(Url2, QArgs, get, OAuth) ++ Headers,
+    #url{host=Host,port=Port}=ibrowse_lib:parse_url(Url),
+    {ok, Worker} = ibrowse:spawn_link_worker_process(Host,Port),
+    try ibrowse:send_req_direct(Worker, Url2 ++ query_args_to_string(QArgs, []), 
+            Headers2, get, [], [ 
+            {response_format,binary}
+            ], infinity) of
+    {ok, "200", _RespHeaders, Body} ->
+        {ok, couch_doc:from_json_obj(?JSON_DECODE(Body))};
+    {ok, "404", _RespHeaders, _Body} ->
+        {not_found, missing}
+    after
+        catch ibrowse:stop_worker_process(Worker)
+    end;
+open_doc(Db, DocId, Options) ->
+    couch_db:open_doc(Db, DocId, Options).
+
+update_doc(Db, Doc, Options) ->
+    update_doc(Db,Doc,Options,interactive_edit).
+
+ensure_full_commit(#httpdb{url=Url,oauth=OAuth,headers=Headers}) ->
+    Headers2 = oauth_header(Url, [], post, OAuth) ++ Headers,
+    #url{host=Host,port=Port}=ibrowse_lib:parse_url(Url),
+    {ok, Worker} = ibrowse:spawn_link_worker_process(Host,Port),
+    case ibrowse:send_req_direct(Worker, Url ++ "_ensure_full_commit", Headers2, post, [], [ 
+           {response_format,binary}
+           ], infinity) of
+    {ok, "201", _RespHeaders, Body} ->
+        catch ibrowse:stop_worker_process(Worker),
+        {Props} = ?JSON_DECODE(Body),
+       {ok, couch_util:get_value(<<"instance_start_time">>,Props)}
+    end;
+ensure_full_commit(Db) ->
+    couch_db:ensure_full_commit(Db).
+
+get_missing_revs(#httpdb{url=Url,oauth=OAuth,headers=Headers}, IdRevs) ->
+    Json = [{Id, couch_doc:revs_to_strs(Revs)} || {Id, Revs} <- IdRevs],
+    Headers2 = oauth_header(Url, [], post, OAuth) ++ Headers,
+    #url{host=Host,port=Port}=ibrowse_lib:parse_url(Url),
+    {ok, Worker} = ibrowse:spawn_link_worker_process(Host,Port),
+    case ibrowse:send_req_direct(Worker, Url ++ "_revs_diff", Headers2, post,
+            ?JSON_ENCODE({Json}), [ 
+           {response_format,binary}
+           ], infinity) of
+    {ok, "200", _RespHeaders, Body} ->
+        catch ibrowse:stop_worker_process(Worker),
+        {JsonResults} = ?JSON_DECODE(Body),
+        ConvertToNativeFun = fun({Id, {Result}}) ->
+                {Id,
+                couch_doc:parse_revs(couch_util:get_value(<<"missing">>,Result)),
+                couch_doc:parse_revs(
+                    couch_util:get_value(<<"possible_ancestors">>, Result, []))}
+            end,
+        {ok, lists:map(ConvertToNativeFun,JsonResults)}
+    end;
+get_missing_revs(Db, IdRevs) ->
+    couch_db:get_missing_revs(Db, IdRevs).
+
+
+options_to_query_args([], Acc) ->
+    lists:reverse(Acc);
+options_to_query_args([delay_commit|Rest], Acc) ->
+    options_to_query_args(Rest, Acc);
+options_to_query_args([{atts_since,[]}|Rest], Acc) ->
+    options_to_query_args(Rest, Acc);
+options_to_query_args([{atts_since,PossibleAncestors}|Rest], Acc) ->
+    options_to_query_args(Rest, [{"atts_since",?JSON_ENCODE(
+            couch_doc:revs_to_strs(PossibleAncestors))} | Acc]).
+
+query_args_to_string([], []) ->
+    "";
+query_args_to_string([], Acc) ->
+    "?" ++ string:join(lists:reverse(Acc), "&");
+query_args_to_string([{K,V}|Rest], Acc) ->
+    query_args_to_string(Rest, [(K ++ "=" ++ V) | Acc]).
+
+open_doc_revs(#httpdb{url=Url,oauth=OAuth,headers=Headers}, Id, Revs, 
+        Options, Fun, Acc) ->
+    Self = self(),
+    QArgs = [{"revs", "true"},{"open_revs", ?JSON_ENCODE(couch_doc:revs_to_strs(Revs))} | 
+        options_to_query_args(Options, [])],
+    IdEncoded =
+    case Id of
+    <<"_design/",RestId/binary>> ->
+        "_design/" ++ couch_util:url_encode(RestId);
+    _ ->
+        couch_util:url_encode(Id)
+    end,
+    Headers2 = oauth_header(Url ++ IdEncoded, QArgs, get, OAuth) ++ [{"accept", "multipart/mixed"} | Headers],
+    Streamer = spawn_link(fun()->
+            FullUrl = Url ++ IdEncoded ++ query_args_to_string(QArgs, []),
+            #url{host=Host,port=Port}=ibrowse_lib:parse_url(Url),
+            {ok, Worker} = ibrowse:spawn_link_worker_process(Host,Port),
+            {ibrowse_req_id, ReqId} = ibrowse:send_req_direct(Worker, FullUrl, Headers2, get, [], [
+                {response_format,binary},
+                {stream_to, {self(), once}}
+                ], infinity),
+            
+            receive
+            {ibrowse_async_headers, ReqId, "200", RespHeaders} ->
+                CType = couch_util:get_value("Content-Type", RespHeaders),
+                couch_httpd:parse_multipart_request(CType, 
+                    fun() -> stream_data_self(ReqId) end,
+                    fun(Ev) -> mp_parse_mixed(Ev) end)
+            end,
+            catch ibrowse:stop_worker_process(Worker),
+            unlink(Self)
+        end),
+    receive_docs(Streamer, Fun, Acc);
+open_doc_revs(Db, Id, Revs, Options, Fun, Acc) ->
+    {ok, Results} = couch_db:open_doc_revs(Db, Id, Revs, Options),
+    {ok, lists:foldl(Fun, Acc, Results)}.
+
+
+receive_docs(Streamer, UserFun, UserAcc) ->
+    Streamer ! {get_headers, self()},
+    receive
+    {headers, Headers} ->    
+        case couch_util:get_value("content-type", Headers) of
+        {"multipart/related", _} = ContentType ->
+            case couch_doc:doc_from_multi_part_stream(ContentType, 
+                 fun() -> receive_doc_data(Streamer) end) of
+            {ok, Doc} ->
+                UserAcc2 = UserFun({ok, Doc}, UserAcc),
+                receive_docs(Streamer, UserFun, UserAcc2)
+            end;
+        {"application/json", []} ->
+            Doc = couch_doc:from_json_obj(
+                    ?JSON_DECODE(receive_all(Streamer, []))),
+            UserAcc2 = UserFun({ok, Doc}, UserAcc),
+            receive_docs(Streamer, UserFun, UserAcc2);
+        {"application/json", [{"error","true"}]} ->
+            {ErrorProps} = ?JSON_DECODE(receive_all(Streamer, [])),
+            Rev = couch_util:get_value(<<"missing">>, ErrorProps),
+            Result = {{not_founds, missing}, couch_doc:parse_rev(Rev)},
+            UserAcc2 = UserFun(Result, UserAcc),
+            receive_docs(Streamer, UserFun, UserAcc2)
+        end;
+    done ->
+        {ok, UserAcc}
+    end.
+
+receive_all(Streamer, Acc)->
+    Streamer ! {next_bytes, self()},
+    receive
+    {body_bytes, Bytes} ->
+        receive_all(Streamer, [Bytes | Acc]);
+    body_done ->
+        lists:reverse(Acc)
+     end.
+    
+
+
+receive_doc_data(Streamer)->    
+    Streamer ! {next_bytes, self()},
+    receive
+    {body_bytes, Bytes} ->
+        {Bytes, fun() -> receive_doc_data(Streamer) end};
+    body_done ->
+        {<<>>, fun() -> receive_doc_data(Streamer) end}
+     end.
+
+
+mp_parse_mixed(eof) ->
+    receive {get_headers, From} ->
+        From ! done
+    end,
+    ok;
+mp_parse_mixed({headers, H}) ->
+    receive {get_headers, From} ->
+        From ! {headers, H}
+    end,
+    fun(Next) ->
+        mp_parse_mixed(Next)
+    end;
+mp_parse_mixed({body, Bytes}) ->
+    receive {next_bytes, From} ->
+        From ! {body_bytes, Bytes}
+    end,
+    fun (Next) ->
+        mp_parse_mixed(Next)
+    end;
+mp_parse_mixed(body_end) ->
+    receive {next_bytes, From} ->
+        From ! body_done;
+    {get_headers, From} ->
+        self() ! {get_headers, From}
+    end,
+    fun (Next) ->
+        mp_parse_mixed(Next)
+    end.
+
+update_doc(#httpdb{url=Url,headers=Headers,oauth=OAuth},Doc,Options,Type) ->
+    QArgs = if Type == replicated_changes ->
+        [{"new_edits", "false"}]; true -> [] end ++ 
+        options_to_query_args(Options, []),
+    
+    Boundary = couch_uuids:random(),
+    JsonBytes = ?JSON_ENCODE(couch_doc:to_json_obj(Doc, [revs,attachments,follows|Options])),
+    {ContentType, Len} = couch_doc:len_doc_to_multi_part_stream(Boundary,
+            JsonBytes, Doc#doc.atts, false),
+    Self = self(),
+    Headers2 = case lists:member(delay_commit, Options) of 
+            true -> [{"X-Couch-Full-Commit", "false"}];
+            false ->  []
+            end ++ [{"Content-Type", ?b2l(ContentType)}] ++ 
+            oauth_header(Url, QArgs, put, OAuth) ++ Headers,
+    Ref = make_ref(),
+    % this streams the doc data to the ibrowse requester
+    DocStreamer = spawn_link(fun() ->
+                couch_doc:doc_to_multi_part_stream(Boundary,
+                    JsonBytes, Doc#doc.atts,
+                    fun(Data) ->
+                        receive {get_data, Ref, Pid} ->
+                            Pid ! {data, Ref, Data}
+                        end
+                    end,
+                    false),
+                unlink(Self)
+            end),
+    #url{host=Host,port=Port}=ibrowse_lib:parse_url(Url),
+    {ok, Worker} = ibrowse:spawn_link_worker_process(Host,Port),
+    case ibrowse:send_req_direct(Worker, Url ++ couch_util:url_encode(Doc#doc.id) ++ query_args_to_string(QArgs, []),
+            [{"Content-Length",Len}|Headers2], put, 
+            {fun(0) ->
+                eof;
+             (LenLeft) when LenLeft > 0 ->
+                DocStreamer ! {get_data, Ref, self()},
+                receive {data, Ref, Data} ->
+                    {ok, Data, LenLeft - iolist_size(Data)}
+                end
+            end, Len}, [], infinity) of
+    {ok, [$2,$0, _], _RespHeaders, Body} ->
+        catch ibrowse:stop_worker_process(Worker),
+        {Props} = ?JSON_DECODE(Body),
+        {ok, couch_doc:parse_rev(couch_util:get_value(<<"rev">>, Props))}
+    end;
+update_doc(Db,Doc,Options,Type) ->
+    couch_db:update_doc(Db,Doc,Options,Type).
+
+changes_since(#httpdb{url=Url,headers=Headers,oauth=OAuth}, Style,
+        StartSeq, UserFun, Acc) ->
+    Url2 = Url ++ "_changes",
+    QArgs = [{"style", atom_to_list(Style)},
+            {"since", integer_to_list(StartSeq)}],
+    Headers2 = oauth_header(Url2, QArgs, get, OAuth) ++ Headers,        
+    #url{host=Host,port=Port}=ibrowse_lib:parse_url(Url),
+    {ok, Worker} = ibrowse:spawn_link_worker_process(Host,Port),
+    {ibrowse_req_id, ReqId} = ibrowse:send_req_direct(Worker, Url2 ++ query_args_to_string(QArgs, ""), 
+            Headers2, get, [], [
+            {response_format,binary},
+            {stream_to, {self(), once}}], infinity),
+    DataFun = fun() ->
+            receive {ibrowse_async_headers, ReqId, "200", _Headers} ->
+                stream_data_self(ReqId)
+            end
+        end,
+    EventFun = fun(Ev) ->
+            changes_ev1(Ev, UserFun, Acc)
+        end,
+    try
+        json_stream_parse:events(DataFun, EventFun)
+    after
+        catch ibrowse:stop_worker_process(Worker)
+    end;
+changes_since(Db, Style, StartSeq, UserFun, Acc) ->
+    couch_db:changes_since(Db, Style, StartSeq, UserFun, Acc).
+
+stream_data_self(ReqId) ->
+    ibrowse:stream_next(ReqId),
+    receive {ibrowse_async_response, ReqId, Data} ->
+        {Data, fun() -> stream_data_self(ReqId) end};
+    {ibrowse_async_response_end, ReqId} ->
+        {<<>>, fun() -> stream_data_self(ReqId) end}
+    end.
+
+changes_ev1(object_start, UserFun, UserAcc) ->
+    fun(Ev) -> changes_ev2(Ev, UserFun, UserAcc) end.
+
+changes_ev2({key, <<"results">>}, UserFun, UserAcc) ->
+    fun(Ev)-> changes_ev3(Ev, UserFun, UserAcc) end;
+changes_ev2(_, UserFun, UserAcc) ->
+    fun(Ev) -> changes_ev2(Ev, UserFun, UserAcc) end.
+
+changes_ev3(array_start, UserFun, UserAcc) ->
+    fun(Ev)-> changes_ev_loop(Ev, UserFun, UserAcc) end.
+
+changes_ev_loop(object_start, UserFun, UserAcc) ->
+    fun(Ev) ->
+        json_stream_parse:collect_object(Ev,
+                fun(Obj) ->
+                    UserAcc2 = UserFun(json_to_doc_info(Obj), UserAcc),
+                    fun(Ev2) -> changes_ev_loop(Ev2, UserFun, UserAcc2) end
+                end)
+    end;
+changes_ev_loop(array_end, _UserFun, _UserAcc) ->
+    fun(_Ev) -> changes_ev_done() end.
+
+changes_ev_done() ->
+    fun(_Ev) -> changes_ev_done() end.
+
+json_to_doc_info({Props}) ->
+    Id = couch_util:get_value(<<"id">>, Props),
+    Seq = couch_util:get_value(<<"seq">>, Props),
+    Changes = couch_util:get_value(<<"changes">>, Props),
+    
+    RevsInfo = lists:map(
+        fun({Change}) ->
+            Rev = couch_doc:parse_rev(couch_util:get_value(<<"rev">>, Change)),
+            Del = ("true" == couch_util:get_value(<<"deleted">>, Change)),
+            #rev_info{rev=Rev,deleted=Del}
+        end, Changes),
+    #doc_info{id=Id,high_seq=Seq,revs=RevsInfo}.
+
+oauth_header(_Url, _QS, _Action, nil) ->
+    [];
+oauth_header(Url, QS, Action, OAuth) ->
+    Consumer =
+            {OAuth#oauth.consumer_key,
+            OAuth#oauth.consumer_secret,
+            OAuth#oauth.signature_method},
+    Method = case Action of
+        get -> "GET";
+        post -> "POST";
+        put -> "PUT";
+        head -> "HEAD"
+    end,
+    Params = oauth:signed_params(Method, Url, QS, Consumer, 
+        #oauth.token,
+        #oauth.token_secret),
+    [{"Authorization", "OAuth " ++ oauth_uri:params_to_header_string(Params)}].
+
+
+    
\ No newline at end of file

Added: couchdb/branches/new_replicator/src/couchdb/couch_httpd_rep.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_httpd_rep.erl?rev=953503&view=auto
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_httpd_rep.erl (added)
+++ couchdb/branches/new_replicator/src/couchdb/couch_httpd_rep.erl Thu Jun 10 23:51:24 2010
@@ -0,0 +1,99 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_httpd_rep).
+
+-include("couch_db.hrl").
+-include("couch_replicate.hrl").
+
+-import(couch_httpd,
+    [send_json/2,send_json/3,send_json/4,send_method_not_allowed/2,
+    start_json_response/2,start_json_response/3,
+    send_chunk/2,last_chunk/1,end_json_response/1,
+    start_chunked_response/3, absolute_uri/2, send/2,
+    start_response_length/4]).
+    
+-export([handle_req/1]).
+
+maybe_add_trailing_slash(Url) ->
+    re:replace(Url, "[^/]$", "&/", [{return, list}]).
+
+parse_rep_db({Props}) ->
+    Url = maybe_add_trailing_slash(couch_util:get_value(<<"url">>, Props)),
+    {AuthProps} = couch_util:get_value(<<"auth">>, Props, {[]}),
+    {BinHeaders} = couch_util:get_value(<<"headers">>, Props, {[]}),
+    Headers = [{?b2l(K),?b2l(V)} || {K,V} <- BinHeaders],
+    
+    case couch_util:get_value(<<"oauth">>, AuthProps) of
+    undefined ->
+        OAuth = nil;
+    {OauthProps} -> 
+        OAuth = #oauth{
+            consumer_key = 
+                ?b2l(couch_util:get_value(<<"consumer_key">>, OauthProps)),
+            token = 
+                ?b2l(couch_util:get_value(<<"token">>, OauthProps)),
+            token_secret = 
+                ?b2l(couch_util:get_value(<<"token_secret">>, OauthProps)),
+            consumer_secret = 
+                ?b2l(couch_util:get_value(<<"consumer_secret">>, OauthProps)),
+            signature_method = 
+                case couch_util:get_value(<<"signature_method">>, OauthProps) of
+                undefined ->        hmac_sha1;
+                <<"PLAINTEXT">> ->  plaintext;
+                <<"HMAC-SHA1">> ->  hmac_sha1;
+                <<"RSA-SHA1">> ->   rsa_sha1
+                end
+        }
+    end,
+    
+    #httpdb{
+        url = Url,
+        oauth = OAuth,
+        headers = Headers
+    };
+parse_rep_db(<<"http://",_/binary>>=Url) ->
+    parse_rep_db({[{<<"url">>,Url}]});
+parse_rep_db(<<"https://",_/binary>>=Url) ->
+    parse_rep_db({[{<<"url">>,Url}]});
+parse_rep_db(<<DbName/binary>>) ->
+    DbName.
+
+
+convert_options([])->
+    [];
+convert_options([{<<"create_target">>, V}|R])->
+    [{create_target, V}|convert_options(R)];
+convert_options([{<<"continuous">>, V}|R])->
+    [{create_target, V}|convert_options(R)];
+convert_options([{<<"filter">>, V}|R])->
+    [{filter, V}|convert_options(R)];
+convert_options([{<<"query_params">>, V}|R])->
+    [{query_params, V}|convert_options(R)];
+convert_options([_|R])-> % skip unknown option
+    convert_options(R).
+
+
+handle_req(#httpd{method='POST'}=Req) ->
+    {PostBody} = couch_httpd:json_body_obj(Req),
+    SrcDb = parse_rep_db(couch_util:get_value(<<"source">>, PostBody)),
+    TgtDb = parse_rep_db(couch_util:get_value(<<"target">>, PostBody)),
+    Options = convert_options(PostBody),
+    try couch_replicate:start(SrcDb, TgtDb, Options, Req#httpd.user_ctx) of
+    {ok, {HistoryResults}} ->
+        send_json(Req, {[{ok, true} | HistoryResults]})
+    catch
+    throw:{db_not_found, Msg} ->
+        send_json(Req, 404, {[{error, db_not_found}, {reason, Msg}]})
+    end;
+handle_req(Req) ->
+    send_method_not_allowed(Req, "POST").

Added: couchdb/branches/new_replicator/src/couchdb/couch_replicate.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_replicate.erl?rev=953503&view=auto
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_replicate.erl (added)
+++ couchdb/branches/new_replicator/src/couchdb/couch_replicate.erl Thu Jun 10 23:51:24 2010
@@ -0,0 +1,419 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicate).
+
+-export([start/4]).
+
+-include("couch_db.hrl").
+-include("couch_replicate.hrl").
+
+
+-record(rep_state, {
+    source_name,
+    target_name,
+    source,
+    target,
+    history,
+    checkpoint_history,
+    start_seq,
+    current_through_seq,
+    committed_seq,
+    source_log,
+    target_log,
+    rep_starttime,
+    src_starttime,
+    tgt_starttime,
+    timer % checkpoint timer
+    }).
+
+-record(stats, {
+    missing_checked=0,
+    missing_found=0,
+    docs_read=0,
+    docs_written=0,
+    doc_write_failures=0
+    }).
+        
+start(Src, Tgt, Options, UserCtx) ->
+    
+    _Continuous = proplists:get_value(continuous, Options, false),
+    _CreateTarget = proplists:get_value(create_target, Options, false),
+    
+    #rep_state{source=Source,target=Target,start_seq=StartSeq} = State = 
+            init_state(Src, Tgt, Options, UserCtx), 
+    
+    {ok, ChangesQueue} = couch_work_queue:new(100000, 500),
+    {ok, MissingRevsQueue} = couch_work_queue:new(100000, 500),
+    
+    spawn_changes_reader(self(), StartSeq, Source, ChangesQueue),
+    spawn_missing_revs_finder(self(), Target, ChangesQueue, MissingRevsQueue),
+    spawn_doc_copy(self(), Source, Target, MissingRevsQueue),
+    
+    {ok, State2, _Stats} = checkpoint_loop(State, gb_trees:from_orddict([]),
+            #stats{}),
+    {ok, State2#rep_state.checkpoint_history}.
+    
+
+spawn_changes_reader(Cp, StartSeq, Source, ChangesQueue) ->
+    spawn_link(
+        fun()->
+            couch_api_wrap:changes_since(Source, all_docs, StartSeq,
+                fun(#doc_info{high_seq=Seq,revs=Revs}=DocInfo, _)->
+                    Cp ! {seq_start, {Seq, length(Revs)}},
+                    Cp ! {add_stat, {#stats.missing_checked, length(Revs)}},
+                    ok = couch_work_queue:queue(ChangesQueue, DocInfo),
+                    {ok, ok}
+                end, ok),
+            couch_work_queue:close(ChangesQueue)
+        end).
+
+
+init_state(Src,Tgt,Options,UserCtx)->    
+    
+    {ok, Source} = couch_api_wrap:db_open(Src, [{user_ctx, UserCtx}]),
+    {ok, Target} = couch_api_wrap:db_open(Tgt, [{user_ctx, UserCtx}]),
+
+    {ok, SourceInfo} = couch_api_wrap:get_db_info(Source),
+    {ok, TargetInfo} = couch_api_wrap:get_db_info(Target),
+
+    RepId = make_replication_id(Src, Tgt, UserCtx, Options),
+    DocId = ?l2b(?LOCAL_DOC_PREFIX ++ RepId),
+    case couch_api_wrap:open_doc(Source, DocId, []) of
+    {ok, SourceLog} ->  SourceLog;
+    _ ->                SourceLog = #doc{id=DocId}
+    end,
+    case couch_api_wrap:open_doc(Target, DocId, []) of
+    {ok, TargetLog} ->  TargetLog;
+    _ ->                TargetLog = #doc{id=DocId}
+    end,
+    {StartSeq, History} = compare_replication_logs(SourceLog, TargetLog),
+    #doc{body={CheckpointHistory}} = SourceLog,
+    State = #rep_state{
+        source_name = Src,
+        target_name = Tgt,
+        source = Source,
+        target = Target,
+        history = History,
+        checkpoint_history = {[{<<"no_changes">>, true}| CheckpointHistory]},
+        start_seq = StartSeq,
+        current_through_seq = StartSeq,
+        committed_seq = StartSeq,
+        source_log = SourceLog,
+        target_log = TargetLog,
+        rep_starttime = httpd_util:rfc1123_date(),
+        src_starttime = couch_util:get_value(instance_start_time, SourceInfo),
+        tgt_starttime = couch_util:get_value(instance_start_time, TargetInfo)
+    },
+    State#rep_state{timer = erlang:start_timer(checkpoint_interval(State), 
+            self(), timed_checkpoint)}.
+
+
+checkpoint_loop(State, SeqsInProgress, Stats) ->
+    receive
+    {seq_start, {Seq, NumChanges}} ->
+        SeqsInProgress2 = gb_trees:insert(Seq, NumChanges, SeqsInProgress),
+        checkpoint_loop(State, SeqsInProgress2, Stats);
+    {seq_changes_done, {Seq, NumChangesDone}} ->
+        TotalChanges = gb_trees:get(Seq, SeqsInProgress),
+        case TotalChanges - NumChangesDone of
+        0 ->
+            State2 =
+            case gb_trees:smallest(SeqsInProgress) of
+            {Seq, _} ->
+                State#rep_state{current_through_seq=Seq};
+            _ ->
+                State
+            end,
+            checkpoint_loop(State2, 
+                    gb_trees:delete(Seq,SeqsInProgress), Stats);
+        NewTotalChanges when NewTotalChanges > 0 ->
+            SeqsInProgress2 =
+                gb_trees:update(Seq, NewTotalChanges, SeqsInProgress),
+            checkpoint_loop(State, SeqsInProgress2, Stats)
+        end;
+    {add_stat, {StatPos, Val}} ->
+        Stat = element(StatPos, Stats),
+        Stats2 = setelement(StatPos, Stats, Stat + Val),
+        checkpoint_loop(State, SeqsInProgress, Stats2);
+    done ->
+        io:format("checkpoint_loop done~n"),
+        0 = gb_trees:size(SeqsInProgress),
+        State2 = do_checkpoint(State, Stats),
+        erlang:cancel_timer(State2#rep_state.timer),
+        receive timed_checkpoint -> ok
+        after 0 -> ok
+        end,
+        {ok, State2, Stats};
+    timed_checkpoint ->
+        State2 = do_checkpoint(State, Stats),
+        Timer = erlang:start_timer(checkpoint_interval(State), 
+                self(), timed_checkpoint),
+        checkpoint_loop(State2#rep_state{timer=Timer}, SeqsInProgress, Stats)
+    end.
+
+
+checkpoint_interval(_State) ->
+    5000.
+
+do_checkpoint(#rep_state{current_through_seq=Seq,committed_seq=OldSeq}=State,
+        _Stats) when Seq == OldSeq ->
+    State;
+do_checkpoint(State, Stats) ->
+    #rep_state{
+        source_name=SourceName,
+        target_name=TargetName,
+        source = Source,
+        target = Target,
+        history = OldHistory,
+        start_seq = StartSeq,
+        current_through_seq = NewSeq,
+        source_log = SourceLog,
+        target_log = TargetLog,
+        rep_starttime = ReplicationStartTime,
+        src_starttime = SrcInstanceStartTime,
+        tgt_starttime = TgtInstanceStartTime
+    } = State,
+    case commit_to_both(Source, Target) of
+    {SrcInstanceStartTime, TgtInstanceStartTime} ->
+        ?LOG_INFO("recording a checkpoint for ~p -> ~p at source update_seq ~p",
+            [SourceName, TargetName, NewSeq]),
+        SessionId = couch_uuids:random(),
+        NewHistoryEntry = {[
+            {<<"session_id">>, SessionId},
+            {<<"start_time">>, list_to_binary(ReplicationStartTime)},
+            {<<"end_time">>, list_to_binary(httpd_util:rfc1123_date())},
+            {<<"start_last_seq">>, StartSeq},
+            {<<"end_last_seq">>, NewSeq},
+            {<<"recorded_seq">>, NewSeq},
+            {<<"missing_checked">>, Stats#stats.missing_checked},
+            {<<"missing_found">>, Stats#stats.missing_found},
+            {<<"docs_read">>, Stats#stats.docs_read},
+            {<<"docs_written">>, Stats#stats.docs_written},
+            {<<"doc_write_failures">>, Stats#stats.doc_write_failures}
+        ]},
+        % limit history to 50 entries
+        NewRepHistory = {[
+            {<<"session_id">>, SessionId},
+            {<<"source_last_seq">>, NewSeq},
+            {<<"history">>, lists:sublist([NewHistoryEntry | OldHistory], 50)}
+        ]},
+
+        try
+        {ok, {SrcRevPos,SrcRevId}} = couch_api_wrap:update_doc(Source, 
+                SourceLog#doc{body=NewRepHistory}, [delay_commit]),
+        {ok, {TgtRevPos,TgtRevId}} = couch_api_wrap:update_doc(Target, 
+                TargetLog#doc{body=NewRepHistory}, [delay_commit]),
+        State#rep_state{
+            checkpoint_history = NewRepHistory,
+            committed_seq = NewSeq,
+            source_log = SourceLog#doc{revs={SrcRevPos, [SrcRevId]}},
+            target_log = TargetLog#doc{revs={TgtRevPos, [TgtRevId]}}
+        }
+        catch throw:conflict ->
+        ?LOG_ERROR("checkpoint failure: conflict (are you replicating to "
+            "yourself?)", []),
+        State
+        end;
+    _Else ->
+        ?LOG_INFO("rebooting ~p -> ~p from last known replication checkpoint",
+            [SourceName, TargetName]),
+        throw(restart)
+    end.
+
+
+commit_to_both(Source, Target) ->
+    % commit the src async
+    ParentPid = self(),
+    SrcCommitPid = spawn_link(fun() ->
+            ParentPid ! {self(), couch_api_wrap:ensure_full_commit(Source)} end),
+
+    % commit tgt sync
+    {ok, TargetStartTime} = couch_api_wrap:ensure_full_commit(Target),
+
+    SourceStartTime =
+    receive
+    {SrcCommitPid, {ok, Timestamp}} ->
+        Timestamp;
+    {'EXIT', SrcCommitPid, _} ->
+        exit(replication_link_failure)
+    end,
+    {SourceStartTime, TargetStartTime}.
+
+
+spawn_missing_revs_finder(StatsProcess, 
+        Target, ChangesQueue, MissingRevsQueue) ->
+    spawn_link(fun() ->
+        missing_revs_finder_loop(StatsProcess, 
+                Target, ChangesQueue, MissingRevsQueue)
+        end).
+
+
+remove_missing(IdRevsSeqDict, []) ->
+    IdRevsSeqDict;
+remove_missing(IdRevsSeqDict, [{MissingId, MissingRevs, _}|Rest]) ->
+    {AllChangedRevs, Seq} = dict:fetch(MissingId, IdRevsSeqDict),
+    case AllChangedRevs -- MissingRevs of
+    [] ->
+        remove_missing(dict:erase(MissingId, IdRevsSeqDict), Rest);
+    NotMissingRevs ->
+        IdRevsSeqDict2 =
+                dict:store(MissingId, {NotMissingRevs, Seq}, IdRevsSeqDict),
+        remove_missing(IdRevsSeqDict2, Rest)
+    end.
+
+
+missing_revs_finder_loop(Cp, 
+        Target, ChangesQueue, MissingRevsQueue) ->
+    case couch_work_queue:dequeue(ChangesQueue) of
+    closed ->
+        io:format("missing_revs_finder_loop done~n"),
+        couch_work_queue:close(MissingRevsQueue);
+    {ok, DocInfos} ->
+        IdRevs = [{Id, [Rev || #rev_info{rev=Rev} <- RevsInfo]} ||
+                #doc_info{id=Id,revs=RevsInfo} <- DocInfos],
+        {ok, Missing} = couch_api_wrap:get_missing_revs(Target, IdRevs),
+        IdRevsSeqDict = dict:from_list(
+            [{Id, {[Rev || #rev_info{rev=Rev} <- RevsInfo], Seq}} ||
+                    #doc_info{id=Id,revs=RevsInfo,high_seq=Seq} <- DocInfos]),
+        NonMissingIdRevsSeqDict = remove_missing(IdRevsSeqDict, Missing),
+        % signal the completion of these that aren't missing
+        lists:foreach(fun({_Id, {Revs, Seq}})->
+                Cp ! {seq_changes_done, {Seq, length(Revs)}}
+            end, dict:to_list(NonMissingIdRevsSeqDict)),
+        % Expand out each into it's own work item
+        lists:foreach(fun({Id, Revs, PAs})->
+            % PA means "possible ancestor"
+            Cp ! {add_stat, {#stats.missing_found, length(Revs)}},
+            {_, Seq} = dict:fetch(Id, IdRevsSeqDict),
+            ok = couch_work_queue:queue(MissingRevsQueue,
+                {Id, Revs, PAs, Seq})
+            end, Missing),
+        missing_revs_finder_loop(Cp, Target, ChangesQueue, 
+                MissingRevsQueue)
+    end.
+
+
+spawn_doc_copy(Cp, Source, Target, MissingRevsQueue) ->
+    spawn_link(fun() ->
+        doc_copy_loop(Cp, Source, Target, MissingRevsQueue)
+    end).
+
+
+doc_copy_loop(Cp, Source, Target, MissingRevsQueue) ->
+    case couch_work_queue:dequeue(MissingRevsQueue,1) of
+    closed ->
+        io:format("doc_copy_loop done~n"),
+        Cp ! done;
+    {ok, [{Id, Revs, PossibleAncestors, Seq}]} ->
+        couch_api_wrap:open_doc_revs(Source, Id, Revs,
+                [{atts_since,PossibleAncestors}],
+                fun({ok, Doc}, _) ->
+                    Cp ! {add_stat, {#stats.docs_read, 1}},
+                    case couch_api_wrap:update_doc(Target, Doc, [],
+                            replicated_changes) of
+                    {ok, _} ->
+                        Cp ! {add_stat, {#stats.docs_written, 1}};
+                    _Error ->
+                        Cp ! {add_stat, {#stats.doc_write_failures, 1}}
+                    end;
+                (_, _) ->
+                    io:format("doc error!!!!!!~n"),
+                    ok
+                end, []),
+        Cp ! {seq_changes_done, {Seq, length(Revs)}},
+        doc_copy_loop(Cp, Source, Target, MissingRevsQueue)
+    end.
+
+
+make_replication_id(Source, Target, UserCtx, Options) ->
+    %% funky algorithm to preserve backwards compatibility
+    {ok, HostName} = inet:gethostname(),
+    % Port = mochiweb_socket_server:get(couch_httpd, port),
+    Src = get_rep_endpoint(UserCtx, Source),
+    Tgt = get_rep_endpoint(UserCtx, Target),
+    Base = [HostName, Src, Tgt] ++
+        case proplists:get_value(filter, Options) of
+        undefined ->
+            [];
+        Filter ->
+            [Filter, proplists:get_value(query_params, Options, {[]})]
+        end,
+    couch_util:to_hex(erlang:md5(term_to_binary(Base))).
+
+get_rep_endpoint(_UserCtx, #httpdb{url=Url,headers=Headers,oauth=OAuth}) ->
+    case OAuth of
+    nil ->
+        {remote, Url, Headers};
+    {OAuth} ->
+        {remote, Url, Headers, OAuth}
+    end;
+get_rep_endpoint(UserCtx, <<DbName/binary>>) ->
+    {local, DbName, UserCtx}.
+
+
+compare_replication_logs(SrcDoc, TgtDoc) ->
+    #doc{body={RepRecProps}} = SrcDoc,
+    #doc{body={RepRecPropsTgt}} = TgtDoc,
+    case couch_util:get_value(<<"session_id">>, RepRecProps) ==
+            couch_util:get_value(<<"session_id">>, RepRecPropsTgt) of
+    true ->
+        % if the records have the same session id,
+        % then we have a valid replication history
+        OldSeqNum = couch_util:get_value(<<"source_last_seq">>, RepRecProps, 0),
+        OldHistory = couch_util:get_value(<<"history">>, RepRecProps, []),
+        {OldSeqNum, OldHistory};
+    false ->
+        SourceHistory = couch_util:get_value(<<"history">>, RepRecProps, []),
+        TargetHistory = couch_util:get_value(<<"history">>, RepRecPropsTgt, []),
+        ?LOG_INFO("Replication records differ. "
+                "Scanning histories to find a common ancestor.", []),
+        ?LOG_DEBUG("Record on source:~p~nRecord on target:~p~n",
+                [RepRecProps, RepRecPropsTgt]),
+        compare_rep_history(SourceHistory, TargetHistory)
+    end.
+
+compare_rep_history(S, T) when S =:= [] orelse T =:= [] ->
+    ?LOG_INFO("no common ancestry -- performing full replication", []),
+    {0, []};
+compare_rep_history([{S}|SourceRest], [{T}|TargetRest]=Target) ->
+    SourceId = couch_util:get_value(<<"session_id">>, S),
+    case has_session_id(SourceId, Target) of
+    true ->
+        RecordSeqNum = couch_util:get_value(<<"recorded_seq">>, S, 0),
+        ?LOG_INFO("found a common replication record with source_seq ~p",
+            [RecordSeqNum]),
+        {RecordSeqNum, SourceRest};
+    false ->
+        TargetId = couch_util:get_value(<<"session_id">>, T),
+        case has_session_id(TargetId, SourceRest) of
+        true ->
+            RecordSeqNum = couch_util:get_value(<<"recorded_seq">>, T, 0),
+            ?LOG_INFO("found a common replication record with source_seq ~p",
+                [RecordSeqNum]),
+            {RecordSeqNum, TargetRest};
+        false ->
+            compare_rep_history(SourceRest, TargetRest)
+        end
+    end.
+
+
+has_session_id(_SessionId, []) ->
+    false;
+has_session_id(SessionId, [{Props} | Rest]) ->
+    case couch_util:get_value(<<"session_id">>, Props, nil) of
+    SessionId ->
+        true;
+    _Else ->
+        has_session_id(SessionId, Rest)
+    end.
+

Added: couchdb/branches/new_replicator/src/couchdb/couch_replicate.hrl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_replicate.hrl?rev=953503&view=auto
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_replicate.hrl (added)
+++ couchdb/branches/new_replicator/src/couchdb/couch_replicate.hrl Thu Jun 10 23:51:24 2010
@@ -0,0 +1,27 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+
+
+-record(httpdb, {
+    url,
+    oauth=nil,
+    headers = []
+}).
+
+-record(oauth, {
+    consumer_key,
+    token,
+    token_secret,
+    consumer_secret,
+    signature_method
+}).
\ No newline at end of file

Added: couchdb/branches/new_replicator/src/couchdb/json_stream_parse.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/json_stream_parse.erl?rev=953503&view=auto
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/json_stream_parse.erl (added)
+++ couchdb/branches/new_replicator/src/couchdb/json_stream_parse.erl Thu Jun 10 23:51:24 2010
@@ -0,0 +1,547 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(json_stream_parse).
+
+
+-export([events/2,to_ejson/1,test_all/0,collect_object/2]).
+
+-define(IS_WS(X), (X == $\  orelse X == $\t orelse X == $\n orelse X == $\r)).
+-define(IS_DELIM(X), (X == $} orelse X == $] orelse X == $,)).
+-define(IS_DIGIT(X), (X >= $0 andalso X =< $9)).
+
+
+
+% Parses the json into events.
+%
+% The DataFun param is a function that produces the data for parsing. When 
+% called it must yield a tuple, or the atom done. The first element in the 
+% tuple is the data itself, and the second element is a function to be called 
+% next to get the next chunk of data in the stream.
+%
+% The EventFun is called everytime a json element is parsed. It must produce
+% a new function to be called for the next event.
+%
+% Events happen each time a new element in the json string is parsed.
+% For simple value types, the data itself is returned:
+% Strings
+% Integers
+% Floats
+% true
+% false
+% null
+%
+% For arrays, the start of the array is signaled by the event array_start
+% atom. The end is signaled by array_end. The events before the end are the
+% values, or nested values.
+% 
+% For objects, the start of the object is signaled by the event object_start
+% atom. The end is signaled by object_end. Each key is signaled by 
+% {key, KeyString}, and the following event is the value, or start of the 
+% value (array_start, object_start).
+%
+events(Data,EventFun) when is_list(Data)->
+    events(list_to_binary(Data),EventFun);
+events(Data,EventFun) when is_binary(Data)->
+    events(fun() -> {Data, fun() -> done end} end,EventFun);
+events(DataFun,EventFun) ->
+    parse_one(DataFun, EventFun, <<>>).
+
+% converts the JSON directly to the erlang represention of Json
+to_ejson(DF) ->
+    {_DF2, EF, _Rest} = events(DF, fun(Ev) -> collect_events(Ev, []) end),
+    [[EJson]] = make_ejson(EF(get_results), [[]]),
+    EJson.
+
+
+% This function is used to return complete objects while parsing streams.
+%
+% Return this function from inside an event function right after getting an 
+% object_start event. It then collects the remaining events for that object 
+% and converts it to the erlang represention of Json.
+%
+% It then calls your ReturnControl function with the erlang object. Your 
+% return control function then should yield another event function.
+%
+% This example stream parses an array of objects, calling 
+% fun do_something_with_the_object/1 for each object.
+%
+%    ev_array(array_start) ->
+%        fun(Ev) -> ev_object_loop(Ev) end.
+%        
+%    ev_object_loop(object_start) ->
+%        fun(Ev) ->
+%            json_stream_parse:collect_object(Ev,
+%                fun(Obj) ->
+%                    do_something_with_the_object(Obj),
+%                    fun(Ev2) -> ev_object_loop(Ev2) end
+%                end)
+%        end;
+%    ev_loop(array_end) ->
+%        ok
+%    end.
+%    
+%    % invoke the parse
+%    main() ->
+%        ...
+%        events(Data, fun(Ev) -> ev_array(Ev) end).
+    
+collect_object(Ev, ReturnControl) ->
+    collect_object(Ev, 0, ReturnControl, [object_start]).
+
+
+
+% internal methods
+
+parse_one(DF,EF,Acc) ->
+    case toke(DF, Acc) of
+    none ->
+        none;
+    {Token, DF2, Rest} ->
+        case Token of
+        "{" ->
+            EF2 = EF(object_start),
+            {DF3, EF3, Rest2} = parse_object(DF2, EF2, Rest),
+            {DF3, EF3(object_end), Rest2};
+        "[" ->
+            EF2 = EF(array_start),
+            {DF3, EF3, Rest2} = parse_array(DF2, EF2, Rest),
+            {DF3, EF3(array_end), Rest2};
+        Int when is_integer(Int)->
+            {DF2, EF(Int), Rest};
+        Float when is_float(Float)->
+            {DF2, EF(Float), Rest};
+        Atom when is_atom(Atom)->
+            {DF2, EF(Atom), Rest};
+        String when is_binary(String)->
+            {DF2, EF(String), Rest};
+        _OtherToken ->
+            err(unexpected_token)
+        end
+    end.
+
+must_parse_one(DF,EF,Acc,Error)->
+    case parse_one(DF, EF, Acc) of
+    none ->
+        err(Error);
+    Else ->
+        Else
+    end.
+
+must_toke(DF, Data, Error) ->
+    case toke(DF, Data) of
+    none ->
+        err(Error);
+    Result ->
+        Result
+    end.
+
+toke(DF, <<>>) ->
+    case DF() of
+    done ->
+        none;
+    {Data, DF2} ->
+        toke(DF2, Data)
+    end;
+toke(DF, <<C,Rest/binary>>) when ?IS_WS(C)->
+    toke(DF, Rest);
+toke(DF, <<${,Rest/binary>>) ->
+    {"{", DF, Rest};
+toke(DF, <<$},Rest/binary>>) ->
+    {"}", DF, Rest};
+toke(DF, <<$[,Rest/binary>>) ->
+    {"[", DF, Rest};
+toke(DF, <<$],Rest/binary>>) ->
+    {"]", DF, Rest};
+toke(DF, <<$",Rest/binary>>) ->
+    toke_string(DF,Rest,[]);
+toke(DF, <<$,,Rest/binary>>) ->
+    {",", DF, Rest};
+toke(DF, <<$:,Rest/binary>>) ->
+    {":", DF, Rest};
+toke(DF, <<$-,Rest/binary>>) ->
+    {<<C,_/binary>> = Data, DF2} = must_df(DF,1,Rest,expected_number),
+    case ?IS_DIGIT(C) of
+    true ->
+        toke_number_leading(DF2, Data, "-");
+    false ->
+        err(expected_number)
+    end;
+toke(DF, <<C,_/binary>> = Data) when ?IS_DIGIT(C) ->
+    toke_number_leading(DF, Data, []);
+toke(DF, <<$t,Rest/binary>>) ->
+    {Data, DF2} = must_match(<<"rue">>, DF, Rest),
+    {true, DF2, Data};
+toke(DF, <<$f,Rest/binary>>) ->
+    {Data, DF2} = must_match(<<"alse">>, DF, Rest),
+    {false, DF2, Data};
+toke(DF, <<$n,Rest/binary>>) ->
+    {Data, DF2} = must_match(<<"ull">>, DF, Rest),
+    {null, DF2, Data};
+toke(_, _) ->
+    err(bad_token).
+
+
+must_match(Pattern, DF, Data) ->
+    Size = size(Pattern),
+    case must_df(DF, Size, Data, bad_token) of
+    {<<Pattern:Size/binary,Data2/binary>>, DF2} ->
+        {Data2, DF2};
+    {_, _} ->
+        err(bad_token)
+    end.
+
+must_df(DF,Error)->
+    case DF() of
+    done ->
+         err(Error);
+    {Data, DF2} ->
+         {Data, DF2}
+    end.
+
+
+must_df(DF,NeedLen,Acc,Error)->    
+    if size(Acc) >= NeedLen ->
+        {Acc, DF};
+    true ->
+        case DF() of
+            done ->
+                err(Error);
+        {Data, DF2} ->
+            must_df(DF2, NeedLen, <<Acc/binary, Data/binary>>, Error)
+        end
+    end.
+
+
+parse_object(DF,EF,Acc) ->
+    case must_toke(DF, Acc, unterminated_object) of
+    {String, DF2, Rest} when is_binary(String)->
+        EF2 = EF({key,String}),
+        case must_toke(DF2,Rest,unterminated_object) of
+        {":", DF3, Rest2} ->
+            {DF4, EF3, Rest3} = must_parse_one(DF3, EF2, Rest2, expected_value),
+            case must_toke(DF4,Rest3, unterminated_object) of
+            {",", DF5, Rest4} ->
+                parse_object(DF5, EF3, Rest4);
+            {"}", DF5, Rest4} ->
+                {DF5, EF3, Rest4};
+            {_, _, _} ->
+                err(unexpected_token)
+            end;
+        _Else ->
+            err(expected_colon)
+        end;
+    {"}", DF2, Rest} ->
+        {DF2, EF, Rest};
+    {_, _, _} ->
+        err(unexpected_token)
+    end.
+
+parse_array0(DF,EF,Acc) ->
+    case toke(DF, Acc) of
+    none ->
+        err(unterminated_array);
+    {",", DF2, Rest} ->
+        parse_array(DF2,EF,Rest);
+    {"]", DF2, Rest} ->
+        {DF2,EF,Rest};
+    _ ->
+        err(unexpected_token)
+    end.
+    
+parse_array(DF,EF,Acc) ->
+    case toke(DF, Acc) of
+    none ->
+         err(unterminated_array);
+    {Token, DF2, Rest} ->
+        case Token of
+        "{" ->
+            EF2 = EF(object_start),
+            {DF3, EF3, Rest2} = parse_object(DF2, EF2, Rest),
+            parse_array0(DF3, EF3(object_end), Rest2);
+        "[" ->
+            EF2 = EF(array_start),
+            {DF3, EF3, Rest2} = parse_array(DF2, EF2, Rest),
+            parse_array0(DF3, EF3(array_end), Rest2);
+        Int when is_integer(Int)->
+            parse_array0(DF2, EF(Int), Rest);
+        Float when is_float(Float)->
+            parse_array0(DF2, EF(Float), Rest);
+        Atom when is_atom(Atom)->
+            parse_array0(DF2, EF(Atom), Rest);
+        String when is_binary(String)->
+            parse_array0(DF2, EF(String), Rest);
+        "]" ->
+            {DF2, EF, Rest};
+        _ ->
+            err(unexpected_token)
+        end
+    end.
+
+
+toke_string(DF, <<>>, Acc) ->
+    {Data, DF2} = must_df(DF, unterminated_string),
+    toke_string(DF2, Data, Acc);
+toke_string(DF, <<$\\,$",Rest/binary>>, Acc) ->
+    toke_string(DF, Rest, [$" | Acc]);
+toke_string(DF, <<$\\,$\\,Rest/binary>>, Acc) ->
+    toke_string(DF, Rest, [$\\ | Acc]);
+toke_string(DF, <<$\\,$/,Rest/binary>>, Acc) ->
+    toke_string(DF, Rest, [$/ | Acc]);
+toke_string(DF, <<$\\,$b,Rest/binary>>, Acc) ->
+    toke_string(DF, Rest, [$\b | Acc]);
+toke_string(DF, <<$\\,$f,Rest/binary>>, Acc) ->
+    toke_string(DF, Rest, [$\f | Acc]);
+toke_string(DF, <<$\\,$n,Rest/binary>>, Acc) ->
+    toke_string(DF, Rest, [$\n | Acc]);
+toke_string(DF, <<$\\,$r,Rest/binary>>, Acc) ->
+    toke_string(DF, Rest, [$\r | Acc]);
+toke_string(DF, <<$\\,$t,Rest/binary>>, Acc) ->
+    toke_string(DF, Rest, [$\t | Acc]);
+toke_string(DF, <<$\\,$u,Rest/binary>>, Acc) ->
+    {<<A,B,C,D,Data/binary>>, DF2} = must_df(DF,4,Rest,missing_hex),
+    UTFChar = erlang:list_to_integer([A, B, C, D], 16),
+    if UTFChar == 16#FFFF orelse UTFChar == 16#FFFE ->
+        err(invalid_utf_char);
+    true ->
+        ok
+    end,
+    Chars = xmerl_ucs:to_utf8(UTFChar),
+    toke_string(DF2, Data, lists:reverse(Chars) ++ Acc);
+toke_string(DF, <<$\\>>, Acc) ->
+    {Data, DF2} = must_df(DF, unterminated_string),
+    toke_string(DF2, <<$\\,Data/binary>>, Acc);
+toke_string(_DF, <<$\\, _/binary>>, _Acc) ->
+    err(bad_escape);
+toke_string(DF, <<$", Rest/binary>>, Acc) ->
+    {list_to_binary(lists:reverse(Acc)), DF, Rest};
+toke_string(DF, <<C, Rest/binary>>, Acc) ->
+    toke_string(DF, Rest, [C | Acc]).
+
+
+toke_number_leading(DF, <<Digit,Rest/binary>>, Acc)
+        when ?IS_DIGIT(Digit) ->
+    toke_number_leading(DF, Rest, [Digit | Acc]);
+toke_number_leading(DF, <<C,_/binary>>=Rest, Acc)
+        when ?IS_WS(C) orelse ?IS_DELIM(C) ->
+    {list_to_integer(lists:reverse(Acc)), DF, Rest};
+toke_number_leading(DF, <<>>, Acc) ->
+    case DF() of
+    done ->
+         {list_to_integer(lists:reverse(Acc)), fun() -> done end, <<>>};
+    {Data, DF2} ->
+        toke_number_leading(DF2, Data, Acc)
+    end;
+toke_number_leading(DF, <<$., Rest/binary>>, Acc) ->
+    toke_number_trailing(DF, Rest, [$.|Acc]);
+toke_number_leading(DF, <<$e, Rest/binary>>, Acc) ->
+    toke_number_exponent(DF, Rest, [$e, $0, $.|Acc]);
+toke_number_leading(DF, <<$E, Rest/binary>>, Acc) ->
+    toke_number_exponent(DF, Rest, [$e, $0, $.|Acc]);
+toke_number_leading(_, _, _) ->
+    err(unexpected_character_in_number).
+
+toke_number_trailing(DF, <<Digit,Rest/binary>>, Acc)
+        when ?IS_DIGIT(Digit) ->
+    toke_number_trailing(DF, Rest, [Digit | Acc]);
+toke_number_trailing(DF, <<C,_/binary>>=Rest, Acc)
+        when ?IS_WS(C) orelse ?IS_DELIM(C) ->
+    {list_to_float(lists:reverse(Acc)), DF, Rest};
+toke_number_trailing(DF, <<>>, Acc) ->
+    case DF() of
+    done ->
+        {list_to_float(lists:reverse(Acc)), fun() -> done end, <<>>};
+    {Data, DF2} ->
+        toke_number_trailing(DF2, Data, Acc)
+    end;
+toke_number_trailing(DF, <<"e", Rest/binary>>, [C|_]=Acc) when C /= $. ->
+    toke_number_exponent(DF, Rest, [$e|Acc]);
+toke_number_trailing(DF, <<"E", Rest/binary>>, [C|_]=Acc) when C /= $. ->
+    toke_number_exponent(DF, Rest, [$e|Acc]);
+toke_number_trailing(_, _, _) ->
+    err(unexpected_character_in_number).
+
+
+toke_number_exponent(DF, <<Digit,Rest/binary>>, Acc) when ?IS_DIGIT(Digit) ->
+    toke_number_exponent(DF, Rest, [Digit | Acc]);
+toke_number_exponent(DF, <<Sign,Rest/binary>>, [$e|_]=Acc)
+        when Sign == $+ orelse Sign == $- ->
+    toke_number_exponent(DF, Rest, [Sign | Acc]);
+toke_number_exponent(DF, <<C,_/binary>>=Rest, Acc)
+        when ?IS_WS(C) orelse ?IS_DELIM(C) ->
+    {list_to_float(lists:reverse(Acc)), DF, Rest};
+toke_number_exponent(DF, <<>>, Acc) ->
+    case DF() of
+    done ->
+        {list_to_float(lists:reverse(Acc)), fun() -> done end, <<>>};
+    {Data, DF2} ->
+        toke_number_exponent(DF2, Data, Acc)
+    end;
+toke_number_exponent(_, _, _) ->
+        err(unexpected_character_in_number).
+
+
+err(Error)->
+    throw({parse_error,Error}).
+
+
+make_ejson([], Stack) ->
+    Stack;
+make_ejson([array_start | RevEvs], [ArrayValues, PrevValues | RestStack]) ->
+    make_ejson(RevEvs, [[ArrayValues | PrevValues] | RestStack]);
+make_ejson([array_end | RevEvs], Stack) ->
+    make_ejson(RevEvs, [[] | Stack]);
+make_ejson([object_start | RevEvs], [ObjValues, PrevValues | RestStack]) ->
+    make_ejson(RevEvs, [[{ObjValues} | PrevValues] | RestStack]);
+make_ejson([object_end | RevEvs], Stack) ->
+    make_ejson(RevEvs, [[] | Stack]);
+make_ejson([{key, String} | RevEvs], [[PrevValue|RestObject] | RestStack] = _Stack) ->
+    make_ejson(RevEvs, [[{String, PrevValue}|RestObject] | RestStack]);
+make_ejson([Value | RevEvs], [Vals | RestStack] = _Stack) ->
+    make_ejson(RevEvs, [[Value | Vals] | RestStack]).
+    
+collect_events(get_results, Acc) ->
+    Acc;
+collect_events(Ev, Acc) ->
+    fun(NextEv) -> collect_events(NextEv, [Ev | Acc]) end.
+
+
+collect_object(object_end, 0, ReturnControl, Acc) ->
+    [[Obj]] = make_ejson([object_end | Acc], [[]]),
+    ReturnControl(Obj);
+collect_object(object_end, NestCount, ReturnControl, Acc) ->
+    fun(Ev) -> collect_object(Ev, NestCount - 1, ReturnControl, 
+            [object_end | Acc]) end;
+collect_object(object_start, NestCount, ReturnControl, Acc) ->
+    fun(Ev) -> collect_object(Ev, NestCount + 1, ReturnControl, 
+            [object_start | Acc]) end;
+collect_object(Ev, NestCount, ReturnControl, Acc) ->
+    fun(Ev2) -> collect_object(Ev2, NestCount, ReturnControl, 
+            [Ev | Acc]) end.
+    
+
+%% testing constructs borrowed from the Yaws JSON implementation.
+
+%% Create an object from a list of Key/Value pairs.
+
+obj_new() ->
+    {[]}.
+
+is_obj({Props}) ->
+    F = fun ({K, _}) when is_binary(K) ->
+                true;
+            (_) ->
+                false
+        end,
+    lists:all(F, Props).
+
+obj_from_list(Props) ->
+    Obj = {Props},
+    case is_obj(Obj) of
+        true -> Obj;
+        false -> exit({json_bad_object, Obj})
+    end.
+
+%% Test for equivalence of Erlang terms.
+%% Due to arbitrary order of construction, equivalent objects might
+%% compare unequal as erlang terms, so we need to carefully recurse
+%% through aggregates (tuples and objects).
+
+equiv({Props1}, {Props2}) ->
+    equiv_object(Props1, Props2);
+equiv(L1, L2) when is_list(L1), is_list(L2) ->
+    equiv_list(L1, L2);
+equiv(N1, N2) when is_number(N1), is_number(N2) -> N1 == N2;
+equiv(B1, B2) when is_binary(B1), is_binary(B2) -> B1 == B2;
+equiv(true, true) -> true;
+equiv(false, false) -> true;
+equiv(null, null) -> true.
+
+%% Object representation and traversal order is unknown.
+%% Use the sledgehammer and sort property lists.
+
+equiv_object(Props1, Props2) ->
+    L1 = lists:keysort(1, Props1),
+    L2 = lists:keysort(1, Props2),
+    Pairs = lists:zip(L1, L2),
+    true = lists:all(fun({{K1, V1}, {K2, V2}}) ->
+                             equiv(K1, K2) and equiv(V1, V2)
+                     end, Pairs).
+
+%% Recursively compare tuple elements for equivalence.
+
+equiv_list([], []) ->
+    true;
+equiv_list([V1 | L1], [V2 | L2]) ->
+    equiv(V1, V2) andalso equiv_list(L1, L2).
+
+test_all() ->
+    [1199344435545.0, 1] = to_ejson(<<"[1199344435545.0,1]">>),
+    test_one(e2j_test_vec(utf8), 1).
+
+single_byte_data_fun([]) ->
+    done;
+single_byte_data_fun([H|T]) ->
+    {<<H>>, fun() -> single_byte_data_fun(T) end}.
+
+test_one([], _N) ->
+    %% io:format("~p tests passed~n", [N-1]),
+    ok;
+test_one([{E, J} | Rest], N) ->
+    io:format("[~p] ~p ~p~n", [N, E, J]),
+    true = equiv(E, to_ejson(J)),
+    true = equiv(E, to_ejson(fun() -> single_byte_data_fun(J) end)),
+    test_one(Rest, 1+N).
+
+e2j_test_vec(utf8) ->
+    [
+     {1, "1"},
+     {3.1416, "3.14160"}, %% text representation may truncate, trail zeroes
+     {-1, "-1"},
+     {-3.1416, "-3.14160"},
+     {12.0e10, "1.20000e+11"},
+     {1.234E+10, "1.23400e+10"},
+     {-1.234E-10, "-1.23400e-10"},
+     {10.0, "1.0e+01"},
+     {123.456, "1.23456E+2"},
+     {10.0, "1e1"},
+     {<<"foo">>, "\"foo\""},
+     {<<"foo", 5, "bar">>, "\"foo\\u0005bar\""},
+     {<<"">>, "\"\""},
+     {<<"\n\n\n">>, "\"\\n\\n\\n\""},
+     {<<"\" \b\f\r\n\t\"">>, "\"\\\" \\b\\f\\r\\n\\t\\\"\""},
+     {obj_new(), "{}"},
+     {obj_from_list([{<<"foo">>, <<"bar">>}]), "{\"foo\":\"bar\"}"},
+     {obj_from_list([{<<"foo">>, <<"bar">>}, {<<"baz">>, 123}]),
+      "{\"foo\":\"bar\",\"baz\":123}"},
+     {[], "[]"},
+     {[[]], "[[]]"},
+     {[1, <<"foo">>], "[1,\"foo\"]"},
+
+     %% json array in a json object
+     {obj_from_list([{<<"foo">>, [123]}]),
+      "{\"foo\":[123]}"},
+
+     %% json object in a json object
+     {obj_from_list([{<<"foo">>, obj_from_list([{<<"bar">>, true}])}]),
+      "{\"foo\":{\"bar\":true}}"},
+
+     %% fold evaluation order
+     {obj_from_list([{<<"foo">>, []},
+                     {<<"bar">>, obj_from_list([{<<"baz">>, true}])},
+                     {<<"alice">>, <<"bob">>}]),
+      "{\"foo\":[],\"bar\":{\"baz\":true},\"alice\":\"bob\"}"},
+
+     %% json object in a json array
+     {[-123, <<"foo">>, obj_from_list([{<<"bar">>, []}]), null],
+      "[-123,\"foo\",{\"bar\":[]},null]"}
+    ].