You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by jc...@apache.org on 2009/03/13 23:15:35 UTC
svn commit: r753448 [2/2] - in /couchdb/trunk: share/ share/www/script/
share/www/script/test/ src/couchdb/
Modified: couchdb/trunk/src/couchdb/couch_httpd.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_httpd.erl?rev=753448&r1=753447&r2=753448&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_httpd.erl (original)
+++ couchdb/trunk/src/couchdb/couch_httpd.erl Fri Mar 13 22:15:34 2009
@@ -16,7 +16,7 @@
-export([start_link/0, stop/0, handle_request/4]).
-export([header_value/2,header_value/3,qs_value/2,qs_value/3,qs/1,path/1,absolute_uri/2]).
--export([verify_is_server_admin/1,unquote/1,quote/1,recv/2,recv_chunked/4]).
+-export([verify_is_server_admin/1,unquote/1,quote/1,recv/2,recv_chunked/4,error_info/1]).
-export([parse_form/1,json_body/1,body/1,doc_etag/1, make_etag/1, etag_respond/3]).
-export([primary_header_value/2,partition/1,serve_file/3]).
-export([start_chunked_response/3,send_chunk/2]).
@@ -166,7 +166,7 @@
catch
throw:Error ->
send_error(HttpReq, Error);
- Tag:Error ->
+ Tag:Error when Error ==foo ->
?LOG_ERROR("Uncaught error in HTTP request: ~p",[{Tag, Error}]),
?LOG_DEBUG("Stacktrace: ~p",[erlang:get_stacktrace()]),
send_error(HttpReq, Error)
@@ -295,8 +295,8 @@
json_body(Httpd) ->
?JSON_DECODE(body(Httpd)).
-doc_etag(#doc{revs=[DiskRev|_]}) ->
- "\"" ++ binary_to_list(DiskRev) ++ "\"".
+doc_etag(#doc{revs={Start, [DiskRev|_]}}) ->
+ "\"" ++ ?b2l(couch_doc:rev_to_str({Start, DiskRev})) ++ "\"".
make_etag(Term) ->
<<SigInt:128/integer>> = erlang:md5(term_to_binary(Term)),
@@ -392,75 +392,55 @@
send_chunk(Resp, []).
-send_error(Req, bad_request) ->
- send_error(Req, 400, <<"bad_request">>, <<>>);
-send_error(Req, {query_parse_error, Reason}) ->
- send_error(Req, 400, <<"query_parse_error">>, Reason);
-send_error(Req, {bad_request, Reason}) ->
- send_error(Req, 400, <<"bad_request">>, Reason);
-send_error(Req, not_found) ->
- send_error(Req, 404, <<"not_found">>, <<"Missing">>);
-send_error(Req, {not_found, Reason}) ->
- send_error(Req, 404, <<"not_found">>, Reason);
-send_error(Req, conflict) ->
- send_error(Req, 409, <<"conflict">>, <<"Document update conflict.">>);
-send_error(Req, {invalid_doc, Reason}) ->
- send_error(Req, 400, <<"invalid_doc">>, Reason);
-send_error(Req, {forbidden, Msg}) ->
- send_json(Req, 403,
- {[{<<"error">>, <<"forbidden">>},
- {<<"reason">>, Msg}]});
-send_error(Req, {unauthorized, Msg}) ->
- case couch_config:get("httpd", "WWW-Authenticate", nil) of
- nil ->
- Headers = [];
- Type ->
- Headers = [{"WWW-Authenticate", Type}]
- end,
- send_json(Req, 401, Headers,
- {[{<<"error">>, <<"unauthorized">>},
- {<<"reason">>, Msg}]});
-send_error(Req, {http_error, Code, Headers, Error, Reason}) ->
- send_json(Req, Code, Headers,
- {[{<<"error">>, Error}, {<<"reason">>, Reason}]});
-send_error(Req, {user_error, {Props}}) ->
- {Headers} = proplists:get_value(<<"headers">>, Props, {[]}),
- send_json(Req,
- proplists:get_value(<<"http_status">>, Props, 500),
- Headers,
- {[{<<"error">>, proplists:get_value(<<"error">>, Props)},
- {<<"reason">>, proplists:get_value(<<"reason">>, Props)}]});
-send_error(Req, file_exists) ->
- send_error(Req, 412, <<"file_exists">>, <<"The database could not be "
- "created, the file already exists.">>);
-send_error(Req, {Error, Reason}) ->
- send_error(Req, 500, Error, Reason);
-send_error(Req, Error) ->
- send_error(Req, 500, <<"error">>, Error).
+error_info(bad_request) ->
+ {400, <<"bad_request">>, <<>>};
+error_info({bad_request, Reason}) ->
+ {400, <<"bad_request">>, Reason};
+error_info({query_parse_error, Reason}) ->
+ {400, <<"query_parse_error">>, Reason};
+error_info(not_found) ->
+ {404, <<"not_found">>, <<"Missing">>};
+error_info({not_found, Reason}) ->
+ {404, <<"not_found">>, Reason};
+error_info(conflict) ->
+ {409, <<"conflict">>, <<"Document update conflict.">>};
+error_info({forbidden, Msg}) ->
+ {403, <<"forbidden">>, Msg};
+error_info({unauthorized, Msg}) ->
+ {401, <<"unauthorized">>, Msg};
+error_info(file_exists) ->
+ {412, <<"file_exists">>, <<"The database could not be "
+ "created, the file already exists.">>};
+error_info({Error, Reason}) ->
+ {500, couch_util:to_binary(Error), couch_util:to_binary(Reason)};
+error_info(Error) ->
+ {500, <<"unknown_error">>, couch_util:to_binary(Error)}.
+send_error(Req, Error) ->
+ {Code, ErrorStr, ReasonStr} = error_info(Error),
+ if Code == 401 ->
+ case couch_config:get("httpd", "WWW-Authenticate", nil) of
+ nil ->
+ Headers = [];
+ Type ->
+ Headers = [{"WWW-Authenticate", Type}]
+ end;
+ true ->
+ Headers = []
+ end,
+ send_error(Req, Code, Headers, ErrorStr, ReasonStr).
+send_error(Req, Code, ErrorStr, ReasonStr) ->
+ send_error(Req, Code, [], ErrorStr, ReasonStr).
+
+send_error(Req, Code, Headers, ErrorStr, ReasonStr) ->
+ send_json(Req, Code, Headers,
+ {[{<<"error">>, ErrorStr},
+ {<<"reason">>, ReasonStr}]}).
-send_error(Req, Code, Error, Msg) when is_atom(Error) ->
- send_error(Req, Code, list_to_binary(atom_to_list(Error)), Msg);
-send_error(Req, Code, Error, Msg) when is_list(Msg) ->
- case (catch list_to_binary(Msg)) of
- Bin when is_binary(Bin) ->
- send_error(Req, Code, Error, Bin);
- _ ->
- send_error(Req, Code, Error, io_lib:format("~p", [Msg]))
- end;
-send_error(Req, Code, Error, Msg) when not is_binary(Error) ->
- send_error(Req, Code, list_to_binary(io_lib:format("~p", [Error])), Msg);
-send_error(Req, Code, Error, Msg) when not is_binary(Msg) ->
- send_error(Req, Code, Error, list_to_binary(io_lib:format("~p", [Msg])));
-send_error(Req, Code, Error, <<>>) ->
- send_json(Req, Code, {[{<<"error">>, Error}]});
-send_error(Req, Code, Error, Msg) ->
- send_json(Req, Code, {[{<<"error">>, Error}, {<<"reason">>, Msg}]}).
-
-send_redirect(Req, Path) ->
- Headers = [{"Location", couch_httpd:absolute_uri(Req, Path)}],
- send_response(Req, 301, Headers, <<>>).
+ send_redirect(Req, Path) ->
+ Headers = [{"Location", couch_httpd:absolute_uri(Req, Path)}],
+ send_response(Req, 301, Headers, <<>>).
negotiate_content_type(#httpd{mochi_req=MochiReq}) ->
%% Determine the appropriate Content-Type header for a JSON response
Modified: couchdb/trunk/src/couchdb/couch_httpd_db.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_httpd_db.erl?rev=753448&r1=753447&r2=753448&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_httpd_db.erl (original)
+++ couchdb/trunk/src/couchdb/couch_httpd_db.erl Fri Mar 13 22:15:34 2009
@@ -22,8 +22,8 @@
-record(doc_query_args, {
options = [],
- rev = "",
- open_revs = ""
+ rev = nil,
+ open_revs = []
}).
% Database request handlers
@@ -89,13 +89,13 @@
db_req(#httpd{method='POST',path_parts=[DbName]}=Req, Db) ->
Doc = couch_doc:from_json_obj(couch_httpd:json_body(Req)),
DocId = couch_util:new_uuid(),
- {ok, NewRev} = couch_db:update_doc(Db, Doc#doc{id=DocId, revs=[]}, []),
+ {ok, NewRev} = couch_db:update_doc(Db, Doc#doc{id=DocId}, []),
DocUrl = absolute_uri(Req,
binary_to_list(<<"/",DbName/binary,"/",DocId/binary>>)),
send_json(Req, 201, [{"Location", DocUrl}], {[
{ok, true},
{id, DocId},
- {rev, NewRev}
+ {rev, couch_doc:rev_to_str(NewRev)}
]});
db_req(#httpd{path_parts=[_DbName]}=Req, _Db) ->
@@ -131,32 +131,59 @@
<<>> -> couch_util:new_uuid();
Id0 -> Id0
end,
- Revs = case proplists:get_value(<<"_rev">>, ObjProps) of
- undefined -> [];
- Rev -> [Rev]
+ case proplists:get_value(<<"_rev">>, ObjProps) of
+ undefined ->
+ Revs = {0, []};
+ Rev ->
+ {Pos, RevId} = couch_doc:parse_rev(Rev),
+ Revs = {Pos, [RevId]}
end,
Doc#doc{id=Id,revs=Revs}
end,
DocsArray),
- {ok, ResultRevs} = couch_db:update_docs(Db, Docs, Options),
-
- % output the results
- DocResults = lists:zipwith(
- fun(Doc, NewRev) ->
- {[{<<"id">>, Doc#doc.id}, {<<"rev">>, NewRev}]}
- end,
- Docs, ResultRevs),
- send_json(Req, 201, {[
- {ok, true},
- {new_revs, DocResults}
- ]});
-
+ Options2 =
+ case proplists:get_value(<<"all_or_nothing">>, JsonProps) of
+ true -> [all_or_nothing|Options];
+ _ -> Options
+ end,
+ case couch_db:update_docs(Db, Docs, Options2) of
+ {ok, Results} ->
+ % output the results
+ DocResults = lists:zipwith(
+ fun(Doc, {ok, NewRev}) ->
+ {[{<<"id">>, Doc#doc.id}, {<<"rev">>, couch_doc:rev_to_str(NewRev)}]};
+ (Doc, Error) ->
+ {_Code, Err, Msg} = couch_httpd:error_info(Error),
+ % maybe we should add the http error code to the json?
+ {[{<<"id">>, Doc#doc.id}, {<<"error">>, Err}, {"reason", Msg}]}
+ end,
+ Docs, Results),
+ send_json(Req, 201, DocResults);
+ {aborted, Errors} ->
+ ErrorsJson =
+ lists:map(
+ fun({{Id, Rev}, Error}) ->
+ {_Code, Err, Msg} = couch_httpd:error_info(Error),
+ {[{<<"id">>, Id},
+ {<<"rev">>, couch_doc:rev_to_str(Rev)},
+ {<<"error">>, Err},
+ {"reason", Msg}]}
+ end, Errors),
+ send_json(Req, 417, ErrorsJson)
+ end;
false ->
Docs = [couch_doc:from_json_obj(JsonObj) || JsonObj <- DocsArray],
- ok = couch_db:update_docs(Db, Docs, Options, false),
- send_json(Req, 201, {[
- {ok, true}
- ]})
+ {ok, Errors} = couch_db:update_docs(Db, Docs, Options, replicated_changes),
+ ErrorsJson =
+ lists:map(
+ fun({{Id, Rev}, Error}) ->
+ {_Code, Err, Msg} = couch_httpd:error_info(Error),
+ {[{<<"id">>, Id},
+ {<<"rev">>, couch_doc:rev_to_str(Rev)},
+ {<<"error">>, Err},
+ {"reason", Msg}]}
+ end, Errors),
+ send_json(Req, 201, ErrorsJson)
end;
db_req(#httpd{path_parts=[_,<<"_bulk_docs">>]}=Req, _Db) ->
send_method_not_allowed(Req, "POST");
@@ -170,12 +197,12 @@
db_req(#httpd{method='POST',path_parts=[_,<<"_purge">>]}=Req, Db) ->
{IdsRevs} = couch_httpd:json_body(Req),
- % validate the json input
- [{_Id, [_|_]=_Revs} = IdRevs || IdRevs <- IdsRevs],
+ IdsRevs2 = [{Id, couch_doc:parse_revs(Revs)} || {Id, Revs} <- IdsRevs],
- case couch_db:purge_docs(Db, IdsRevs) of
+ case couch_db:purge_docs(Db, IdsRevs2) of
{ok, PurgeSeq, PurgedIdsRevs} ->
- send_json(Req, 200, {[{<<"purge_seq">>, PurgeSeq}, {<<"purged">>, {PurgedIdsRevs}}]});
+ PurgedIdsRevs2 = [{Id, couch_doc:rev_to_strs(Revs)} || {Id, Revs} <- PurgedIdsRevs],
+ send_json(Req, 200, {[{<<"purge_seq">>, PurgeSeq}, {<<"purged">>, {PurgedIdsRevs2}}]});
Error ->
throw(Error)
end;
@@ -204,7 +231,7 @@
{ok, Info} = couch_db:get_db_info(Db),
CurrentEtag = couch_httpd:make_etag(proplists:get_value(update_seq, Info)),
- couch_httpd:etag_respond(Req, CurrentEtag, fun() ->
+ couch_httpd:etag_respond(Req, CurrentEtag, fun() ->
TotalRowCount = proplists:get_value(doc_count, Info),
FoldlFun = couch_httpd_view:make_view_fold_fun(Req, QueryArgs, CurrentEtag, Db,
TotalRowCount, #view_fold_helper_funs{
@@ -227,14 +254,14 @@
deleted_conflict_revs=DelConflictRevs
} = DocInfo,
Json = {
- [{<<"rev">>, Rev}] ++
+ [{<<"rev">>, couch_doc:rev_to_str(Rev)}] ++
case ConflictRevs of
[] -> [];
- _ -> [{<<"conflicts">>, ConflictRevs}]
+ _ -> [{<<"conflicts">>, couch_doc:rev_to_strs(ConflictRevs)}]
end ++
case DelConflictRevs of
[] -> [];
- _ -> [{<<"deleted_conflicts">>, DelConflictRevs}]
+ _ -> [{<<"deleted_conflicts">>, couch_doc:rev_to_strs(DelConflictRevs)}]
end ++
case Deleted of
true -> [{<<"deleted">>, true}];
@@ -251,9 +278,11 @@
db_req(#httpd{method='POST',path_parts=[_,<<"_missing_revs">>]}=Req, Db) ->
{JsonDocIdRevs} = couch_httpd:json_body(Req),
- {ok, Results} = couch_db:get_missing_revs(Db, JsonDocIdRevs),
+ JsonDocIdRevs2 = [{Id, [couch_doc:parse_rev(RevStr) || RevStr <- RevStrs]} || {Id, RevStrs} <- JsonDocIdRevs],
+ {ok, Results} = couch_db:get_missing_revs(Db, JsonDocIdRevs2),
+ Results2 = [{Id, [couch_doc:rev_to_str(Rev) || Rev <- Revs]} || {Id, Revs} <- Results],
send_json(Req, {[
- {missing_revs, {Results}}
+ {missing_revs, {Results2}}
]});
db_req(#httpd{path_parts=[_,<<"_missing_revs">>]}=Req, _Db) ->
@@ -271,6 +300,18 @@
db_req(#httpd{path_parts=[_,<<"_admins">>]}=Req, _Db) ->
send_method_not_allowed(Req, "PUT,GET");
+db_req(#httpd{method='PUT',path_parts=[_,<<"_revs_limit">>]}=Req,
+ Db) ->
+ Limit = couch_httpd:json_body(Req),
+ ok = couch_db:set_revs_limit(Db, Limit),
+ send_json(Req, {[{<<"ok">>, true}]});
+
+db_req(#httpd{method='GET',path_parts=[_,<<"_revs_limit">>]}=Req, Db) ->
+ send_json(Req, couch_db:get_revs_limit(Db));
+
+db_req(#httpd{path_parts=[_,<<"_revs_limit">>]}=Req, _Db) ->
+ send_method_not_allowed(Req, "PUT,GET");
+
% Special case to enable using an unencoded slash in the URL of design docs,
% as slashes in document IDs must otherwise be URL encoded.
db_req(#httpd{method='GET',mochi_req=MochiReq, path_parts=[DbName,<<"_design/",_/binary>>|_]}=Req, _Db) ->
@@ -334,7 +375,7 @@
AdapterFun = fun(#full_doc_info{id=Id}=FullDocInfo, Offset, Acc) ->
case couch_doc:to_doc_info(FullDocInfo) of
#doc_info{deleted=false, rev=Rev} ->
- FoldlFun({{Id, Id}, {[{rev, Rev}]}}, Offset, Acc);
+ FoldlFun({{Id, Id}, {[{rev, couch_doc:rev_to_str(Rev)}]}}, Offset, Acc);
#doc_info{deleted=true} ->
{ok, Acc}
end
@@ -358,9 +399,9 @@
DocInfo = (catch couch_db:get_doc_info(Db, Key)),
Doc = case DocInfo of
{ok, #doc_info{id=Id, rev=Rev, deleted=false}} = DocInfo ->
- {{Id, Id}, {[{rev, Rev}]}};
+ {{Id, Id}, {[{rev, couch_doc:rev_to_str(Rev)}]}};
{ok, #doc_info{id=Id, rev=Rev, deleted=true}} = DocInfo ->
- {{Id, Id}, {[{rev, Rev}, {deleted, true}]}};
+ {{Id, Id}, {[{rev, couch_doc:rev_to_str(Rev)}, {deleted, true}]}};
not_found ->
{{Key, error}, not_found};
_ ->
@@ -381,20 +422,12 @@
-
-
db_doc_req(#httpd{method='DELETE'}=Req, Db, DocId) ->
- case extract_header_rev(Req, couch_httpd:qs_value(Req, "rev")) of
- missing_rev ->
- couch_httpd:send_error(Req, 409, <<"missing_rev">>,
- <<"Document rev/etag must be specified to delete">>);
- RevToDelete ->
- {ok, NewRev} = couch_db:delete_doc(Db, DocId, [RevToDelete]),
- send_json(Req, 200, {[
- {ok, true},
- {id, DocId},
- {rev, NewRev}
- ]})
+ case couch_httpd:qs_value(Req, "rev") of
+ undefined ->
+ update_doc(Req, Db, DocId, {[{<<"_deleted">>,true}]});
+ Rev ->
+ update_doc(Req, Db, DocId, {[{<<"_rev">>, ?l2b(Rev)},{<<"_deleted">>,true}]})
end;
db_doc_req(#httpd{method='GET'}=Req, Db, DocId) ->
@@ -438,82 +471,31 @@
end_json_response(Resp)
end;
-db_doc_req(#httpd{method='POST'}=Req, Db, DocId) ->
- Form = couch_httpd:parse_form(Req),
- Rev = list_to_binary(proplists:get_value("_rev", Form)),
- Doc = case couch_db:open_doc_revs(Db, DocId, [Rev], []) of
- {ok, [{ok, Doc0}]} -> Doc0#doc{revs=[Rev]};
- {ok, [Error]} -> throw(Error)
- end,
-
- NewAttachments = [
- {validate_attachment_name(Name), {list_to_binary(ContentType), Content}} ||
- {Name, {ContentType, _}, Content} <-
- proplists:get_all_values("_attachments", Form)
- ],
- #doc{attachments=Attachments} = Doc,
- NewDoc = Doc#doc{
- attachments = Attachments ++ NewAttachments
- },
- {ok, NewRev} = couch_db:update_doc(Db, NewDoc, []),
-
- send_json(Req, 201, [{"Etag", "\"" ++ NewRev ++ "\""}], {obj, [
- {ok, true},
- {id, DocId},
- {rev, NewRev}
- ]});
-
db_doc_req(#httpd{method='PUT'}=Req, Db, DocId) ->
- Json = couch_httpd:json_body(Req),
- Doc = couch_doc:from_json_obj(Json),
- ExplicitRev =
- case Doc#doc.revs of
- [Rev0|_] -> Rev0;
- [] -> undefined
- end,
- validate_attachment_names(Doc),
- case couch_httpd:header_value(Req, "X-Couch-Full-Commit", "false") of
- "true" ->
- Options = [full_commit];
- _ ->
- Options = []
- end,
- case extract_header_rev(Req, ExplicitRev) of
- missing_rev ->
- Revs = [];
- Rev ->
- Revs = [Rev]
- end,
- {ok, NewRev} = couch_db:update_doc(Db, Doc#doc{id=DocId, revs=Revs}, Options),
- send_json(Req, 201, [{"Etag", <<"\"", NewRev/binary, "\"">>}], {[
- {ok, true},
- {id, DocId},
- {rev, NewRev}
- ]});
+ update_doc(Req, Db, DocId, couch_httpd:json_body(Req));
db_doc_req(#httpd{method='COPY'}=Req, Db, SourceDocId) ->
SourceRev =
case extract_header_rev(Req, couch_httpd:qs_value(Req, "rev")) of
- missing_rev -> [];
+ missing_rev -> nil;
Rev -> Rev
end,
- {TargetDocId, TargetRev} = parse_copy_destination_header(Req),
+ {TargetDocId, TargetRevs} = parse_copy_destination_header(Req),
% open revision Rev or Current
Doc = couch_doc_open(Db, SourceDocId, SourceRev, []),
-
% save new doc
- {ok, NewTargetRev} = couch_db:update_doc(Db, Doc#doc{id=TargetDocId, revs=TargetRev}, []),
-
- send_json(Req, 201, [{"Etag", "\"" ++ binary_to_list(NewTargetRev) ++ "\""}], {[
- {ok, true},
- {id, TargetDocId},
- {rev, NewTargetRev}
- ]});
+ case couch_db:update_doc(Db, Doc#doc{id=TargetDocId, revs=TargetRevs}, []) of
+ {ok, NewTargetRev} ->
+ send_json(Req, 201, [{"Etag", "\"" ++ ?b2l(couch_doc:rev_to_str(NewTargetRev)) ++ "\""}],
+ update_result_to_json({ok, NewTargetRev}));
+ Error ->
+ throw(Error)
+ end;
db_doc_req(#httpd{method='MOVE'}=Req, Db, SourceDocId) ->
- SourceRev =
+ SourceRev = {SourceRevPos, SourceRevId} =
case extract_header_rev(Req, couch_httpd:qs_value(Req, "rev")) of
missing_rev ->
throw({bad_request, "MOVE requires a specified rev parameter"
@@ -521,37 +503,68 @@
Rev -> Rev
end,
- {TargetDocId, TargetRev} = parse_copy_destination_header(Req),
+ {TargetDocId, TargetRevs} = parse_copy_destination_header(Req),
% open revision Rev or Current
Doc = couch_doc_open(Db, SourceDocId, SourceRev, []),
% save new doc & delete old doc in one operation
Docs = [
- Doc#doc{id=TargetDocId, revs=TargetRev},
- #doc{id=SourceDocId, revs=[SourceRev], deleted=true}
+ #doc{id=SourceDocId, revs={SourceRevPos, [SourceRevId]}, deleted=true},
+ Doc#doc{id=TargetDocId, revs=TargetRevs}
],
- {ok, ResultRevs} = couch_db:update_docs(Db, Docs, []),
+ {ok, [SourceResult, TargetResult]} = couch_db:update_docs(Db, Docs, []),
- DocResults = lists:zipwith(
- fun(FDoc, NewRev) ->
- {[{id, FDoc#doc.id}, {rev, NewRev}]}
- end,
- Docs, ResultRevs),
send_json(Req, 201, {[
- {ok, true},
- {new_revs, DocResults}
+ {SourceDocId, update_result_to_json(SourceResult)},
+ {TargetDocId, update_result_to_json(TargetResult)}
]});
db_doc_req(Req, _Db, _DocId) ->
send_method_not_allowed(Req, "DELETE,GET,HEAD,POST,PUT,COPY,MOVE").
+update_result_to_json({ok, NewRev}) ->
+ {[{rev, couch_doc:rev_to_str(NewRev)}]};
+update_result_to_json(Error) ->
+ {_Code, ErrorStr, Reason} = couch_httpd:error_info(Error),
+ {[{error, ErrorStr}, {reason, Reason}]}.
+
+
+update_doc(Req, Db, DocId, Json) ->
+ #doc{deleted=Deleted} = Doc = couch_doc:from_json_obj(Json),
+ validate_attachment_names(Doc),
+ ExplicitDocRev =
+ case Doc#doc.revs of
+ {Start,[RevId|_]} -> {Start, RevId};
+ _ -> undefined
+ end,
+ case extract_header_rev(Req, ExplicitDocRev) of
+ missing_rev ->
+ Revs = {0, []};
+ {Pos, Rev} ->
+ Revs = {Pos, [Rev]}
+ end,
+
+ case couch_httpd:header_value(Req, "X-Couch-Full-Commit", "false") of
+ "true" ->
+ Options = [full_commit];
+ _ ->
+ Options = []
+ end,
+ {ok, NewRev} = couch_db:update_doc(Db, Doc#doc{id=DocId, revs=Revs}, Options),
+ NewRevStr = couch_doc:rev_to_str(NewRev),
+ send_json(Req, if Deleted -> 200; true -> 201 end,
+ [{"Etag", <<"\"", NewRevStr/binary, "\"">>}], {[
+ {ok, true},
+ {id, DocId},
+ {rev, NewRevStr}]}).
+
% Useful for debugging
% couch_doc_open(Db, DocId) ->
% couch_doc_open(Db, DocId, [], []).
couch_doc_open(Db, DocId, Rev, Options) ->
case Rev of
- "" -> % open most recent rev
+ nil -> % open most recent rev
case couch_db:open_doc(Db, DocId, Options) of
{ok, Doc} ->
Doc;
@@ -572,13 +585,13 @@
db_attachment_req(#httpd{method='GET'}=Req, Db, DocId, FileNameParts) ->
FileName = list_to_binary(mochiweb_util:join(lists:map(fun binary_to_list/1, FileNameParts),"/")),
case couch_db:open_doc(Db, DocId, []) of
- {ok, #doc{attachments=Attachments, revs=[LastRev|_OldRevs]}} ->
+ {ok, #doc{attachments=Attachments}=Doc} ->
case proplists:get_value(FileName, Attachments) of
undefined ->
throw({not_found, "Document is missing attachment"});
{Type, Bin} ->
{ok, Resp} = start_chunked_response(Req, 200, [
- {"ETag", binary_to_list(LastRev)},
+ {"ETag", couch_httpd:doc_etag(Doc)},
{"Cache-Control", "must-revalidate"},
{"Content-Type", binary_to_list(Type)}%,
% My understanding of http://www.faqs.org/rfcs/rfc2616.html
@@ -640,7 +653,7 @@
#doc{id=DocId};
Rev ->
case couch_db:open_doc_revs(Db, DocId, [Rev], []) of
- {ok, [{ok, Doc0}]} -> Doc0#doc{revs=[Rev]};
+ {ok, [{ok, Doc0}]} -> Doc0;
{ok, [Error]} -> throw(Error)
end
end,
@@ -653,7 +666,7 @@
send_json(Req, case Method of 'DELETE' -> 200; _ -> 201 end, {[
{ok, true},
{id, DocId},
- {rev, UpdatedRev}
+ {rev, couch_doc:rev_to_str(UpdatedRev)}
]});
db_attachment_req(Req, _Db, _DocId, _FileNameParts) ->
@@ -682,25 +695,24 @@
Options = [deleted_conflicts | Args#doc_query_args.options],
Args#doc_query_args{options=Options};
{"rev", Rev} ->
- Args#doc_query_args{rev=list_to_binary(Rev)};
+ Args#doc_query_args{rev=couch_doc:parse_rev(Rev)};
{"open_revs", "all"} ->
Args#doc_query_args{open_revs=all};
{"open_revs", RevsJsonStr} ->
JsonArray = ?JSON_DECODE(RevsJsonStr),
- Args#doc_query_args{open_revs=JsonArray};
+ Args#doc_query_args{open_revs=[couch_doc:parse_rev(Rev) || Rev <- JsonArray]};
_Else -> % unknown key value pair, ignore.
Args
end
end, #doc_query_args{}, couch_httpd:qs(Req)).
-
-extract_header_rev(Req, ExplicitRev) when is_list(ExplicitRev)->
- extract_header_rev(Req, list_to_binary(ExplicitRev));
+extract_header_rev(Req, ExplicitRev) when is_binary(ExplicitRev) or is_list(ExplicitRev)->
+ extract_header_rev(Req, couch_doc:parse_rev(ExplicitRev));
extract_header_rev(Req, ExplicitRev) ->
Etag = case couch_httpd:header_value(Req, "If-Match") of
undefined -> undefined;
- Value -> list_to_binary(string:strip(Value, both, $"))
+ Value -> couch_doc:parse_rev(string:strip(Value, both, $"))
end,
case {ExplicitRev, Etag} of
{undefined, undefined} -> missing_rev;
@@ -716,11 +728,12 @@
Destination = couch_httpd:header_value(Req, "Destination"),
case regexp:match(Destination, "\\?") of
nomatch ->
- {list_to_binary(Destination), []};
+ {list_to_binary(Destination), {0, []}};
{match, _, _} ->
{ok, [DocId, RevQueryOptions]} = regexp:split(Destination, "\\?"),
{ok, [_RevQueryKey, Rev]} = regexp:split(RevQueryOptions, "="),
- {list_to_binary(DocId), [list_to_binary(Rev)]}
+ {Pos, RevId} = couch_doc:parse_rev(Rev),
+ {list_to_binary(DocId), {Pos, [RevId]}}
end.
validate_attachment_names(Doc) ->
Modified: couchdb/trunk/src/couchdb/couch_httpd_misc_handlers.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_httpd_misc_handlers.erl?rev=753448&r1=753447&r2=753448&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_httpd_misc_handlers.erl (original)
+++ couchdb/trunk/src/couchdb/couch_httpd_misc_handlers.erl Fri Mar 13 22:15:34 2009
@@ -70,35 +70,36 @@
handle_task_status_req(Req) ->
send_method_not_allowed(Req, "GET,HEAD").
+% add trailing slash if missing
+fix_db_url(UrlBin) ->
+ ?l2b(case lists:last(Url = ?b2l(UrlBin)) of
+ $/ -> Url;
+ _ -> Url ++ "/"
+ end).
+
+
+get_rep_endpoint(_Req, {Props}) ->
+ Url = proplists:get_value(<<"url">>, Props),
+ {BinHeaders} = proplists:get_value(<<"headers">>, Props, {[]}),
+ {remote, fix_db_url(Url), [{?b2l(K),?b2l(V)} || {K,V} <- BinHeaders]};
+get_rep_endpoint(_Req, <<"http://",_/binary>>=Url) ->
+ {remote, fix_db_url(Url), []};
+get_rep_endpoint(_Req, <<"https://",_/binary>>=Url) ->
+ {remote, fix_db_url(Url), []};
+get_rep_endpoint(#httpd{user_ctx=UserCtx}, <<DbName/binary>>) ->
+ {local, DbName, UserCtx}.
-handle_replicate_req(#httpd{user_ctx=UserCtx,method='POST'}=Req) ->
+handle_replicate_req(#httpd{method='POST'}=Req) ->
{Props} = couch_httpd:json_body(Req),
- Source = proplists:get_value(<<"source">>, Props),
- Target = proplists:get_value(<<"target">>, Props),
-
- {SrcOpts} = proplists:get_value(<<"source_options">>, Props, {[]}),
- {SrcHeadersBinary} = proplists:get_value(<<"headers">>, SrcOpts, {[]}),
- SrcHeaders = [{?b2l(K),(V)} || {K,V} <- SrcHeadersBinary],
-
- {TgtOpts} = proplists:get_value(<<"target_options">>, Props, {[]}),
- {TgtHeadersBinary} = proplists:get_value(<<"headers">>, TgtOpts, {[]}),
- TgtHeaders = [{?b2l(K),(V)} || {K,V} <- TgtHeadersBinary],
-
- {Options} = proplists:get_value(<<"options">>, Props, {[]}),
- Options2 = [{source_options,
- [{headers, SrcHeaders},
- {user_ctx, UserCtx}]},
- {target_options,
- [{headers, TgtHeaders},
- {user_ctx, UserCtx}]}
- | Options],
- case couch_rep:replicate(Source, Target, Options2) of
- {ok, {JsonResults}} ->
- send_json(Req, {[{ok, true} | JsonResults]});
- {error, {Type, Details}} ->
- send_json(Req, 500, {[{error, Type}, {reason, Details}]});
- {error, Reason} ->
- send_json(Req, 500, {[{error, Reason}]})
+ Source = get_rep_endpoint(Req, proplists:get_value(<<"source">>, Props)),
+ Target = get_rep_endpoint(Req, proplists:get_value(<<"target">>, Props)),
+ case couch_rep:replicate(Source, Target) of
+ {ok, {JsonResults}} ->
+ send_json(Req, {[{ok, true} | JsonResults]});
+ {error, {Type, Details}} ->
+ send_json(Req, 500, {[{error, Type}, {reason, Details}]});
+ {error, Reason} ->
+ send_json(Req, 500, {[{error, Reason}]})
end;
handle_replicate_req(Req) ->
send_method_not_allowed(Req, "POST").
Modified: couchdb/trunk/src/couchdb/couch_httpd_show.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_httpd_show.erl?rev=753448&r1=753447&r2=753448&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_httpd_show.erl (original)
+++ couchdb/trunk/src/couchdb/couch_httpd_show.erl Fri Mar 13 22:15:34 2009
@@ -27,10 +27,10 @@
path_parts=[_DbName, _Design, DesignName, _Show, ShowName, DocId]
}=Req, Db) ->
DesignId = <<"_design/", DesignName/binary>>,
- #doc{body={Props}} = couch_httpd_db:couch_doc_open(Db, DesignId, [], []),
+ #doc{body={Props}} = couch_httpd_db:couch_doc_open(Db, DesignId, nil, []),
Lang = proplists:get_value(<<"language">>, Props, <<"javascript">>),
ShowSrc = get_nested_json_value({Props}, [<<"shows">>, ShowName]),
- Doc = try couch_httpd_db:couch_doc_open(Db, DocId, [], []) of
+ Doc = try couch_httpd_db:couch_doc_open(Db, DocId, nil, []) of
FoundDoc -> FoundDoc
catch
_ -> nil
@@ -42,7 +42,7 @@
path_parts=[_DbName, _Design, DesignName, _Show, ShowName]
}=Req, Db) ->
DesignId = <<"_design/", DesignName/binary>>,
- #doc{body={Props}} = couch_httpd_db:couch_doc_open(Db, DesignId, [], []),
+ #doc{body={Props}} = couch_httpd_db:couch_doc_open(Db, DesignId, nil, []),
Lang = proplists:get_value(<<"language">>, Props, <<"javascript">>),
ShowSrc = get_nested_json_value({Props}, [<<"shows">>, ShowName]),
send_doc_show_response(Lang, ShowSrc, nil, nil, Req, Db);
@@ -56,7 +56,7 @@
handle_view_list_req(#httpd{method='GET',
path_parts=[_DbName, _Design, DesignName, _List, ListName, ViewName]}=Req, Db) ->
DesignId = <<"_design/", DesignName/binary>>,
- #doc{body={Props}} = couch_httpd_db:couch_doc_open(Db, DesignId, [], []),
+ #doc{body={Props}} = couch_httpd_db:couch_doc_open(Db, DesignId, nil, []),
Lang = proplists:get_value(<<"language">>, Props, <<"javascript">>),
ListSrc = get_nested_json_value({Props}, [<<"lists">>, ListName]),
send_view_list_response(Lang, ListSrc, ViewName, DesignId, Req, Db, nil);
@@ -67,7 +67,7 @@
handle_view_list_req(#httpd{method='POST',
path_parts=[_DbName, _Design, DesignName, _List, ListName, ViewName]}=Req, Db) ->
DesignId = <<"_design/", DesignName/binary>>,
- #doc{body={Props}} = couch_httpd_db:couch_doc_open(Db, DesignId, [], []),
+ #doc{body={Props}} = couch_httpd_db:couch_doc_open(Db, DesignId, nil, []),
Lang = proplists:get_value(<<"language">>, Props, <<"javascript">>),
ListSrc = get_nested_json_value({Props}, [<<"lists">>, ListName]),
ReqBody = couch_httpd:body(Req),
@@ -370,13 +370,12 @@
couch_httpd_external:send_external_response(Req, JsonResp)
end);
-send_doc_show_response(Lang, ShowSrc, DocId, #doc{revs=[DocRev|_]}=Doc,
- #httpd{mochi_req=MReq}=Req, Db) ->
+send_doc_show_response(Lang, ShowSrc, DocId, #doc{revs=Revs}=Doc, #httpd{mochi_req=MReq}=Req, Db) ->
% calculate the etag
Headers = MReq:get(headers),
Hlist = mochiweb_headers:to_list(Headers),
Accept = proplists:get_value('Accept', Hlist),
- CurrentEtag = couch_httpd:make_etag({Lang, ShowSrc, DocRev, Accept}),
+ CurrentEtag = couch_httpd:make_etag({Lang, ShowSrc, Revs, Accept}),
% We know our etag now
couch_httpd:etag_respond(Req, CurrentEtag, fun() ->
ExternalResp = couch_query_servers:render_doc_show(Lang, ShowSrc,
Modified: couchdb/trunk/src/couchdb/couch_httpd_view.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_httpd_view.erl?rev=753448&r1=753447&r2=753448&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_httpd_view.erl (original)
+++ couchdb/trunk/src/couchdb/couch_httpd_view.erl Fri Mar 13 22:15:34 2009
@@ -565,14 +565,14 @@
true ->
Rev = case Value of
{Props} ->
- case is_list(Props) of
- true ->
- proplists:get_value(<<"_rev">>, Props, []);
- _ ->
- []
+ case proplists:get_value(<<"_rev">>, Props) of
+ undefined ->
+ nil;
+ Rev0 ->
+ couch_doc:parse_rev(Rev0)
end;
_ ->
- []
+ nil
end,
?LOG_DEBUG("Include Doc: ~p ~p", [DocId, Rev]),
case (catch couch_httpd_db:couch_doc_open(Db, DocId, Rev, [])) of
Modified: couchdb/trunk/src/couchdb/couch_key_tree.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_key_tree.erl?rev=753448&r1=753447&r2=753448&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_key_tree.erl (original)
+++ couchdb/trunk/src/couchdb/couch_key_tree.erl Fri Mar 13 22:15:34 2009
@@ -13,7 +13,8 @@
-module(couch_key_tree).
-export([merge/2, find_missing/2, get_key_leafs/2, get_full_key_paths/2, get/2]).
--export([map/2, get_all_leafs/1, get_leaf_keys/1, count_leafs/1, remove_leafs/2,get_all_leafs_full/1]).
+-export([map/2, get_all_leafs/1, count_leafs/1, remove_leafs/2,
+ get_all_leafs_full/1,stem/2,test/0]).
% a key tree looks like this:
% Tree -> [] or [{Key, Value, ChildTree} | SiblingTree]
@@ -22,70 +23,150 @@
% And each Key < SiblingKey
+% partial trees arranged by how much they are cut off.
-% key tree functions
+merge(A, B) ->
+ {Merged, HasConflicts} =
+ lists:foldl(
+ fun(InsertTree, {AccTrees, AccConflicts}) ->
+ case merge_one(AccTrees, InsertTree, [], false) of
+ {ok, Merged, Conflicts} ->
+ {Merged, Conflicts or AccConflicts};
+ no ->
+ {[InsertTree | AccTrees], true}
+ end
+ end,
+ {A, false}, B),
+ if HasConflicts or
+ ((length(Merged) /= length(A)) and (length(Merged) /= length(B))) ->
+ Conflicts = conflicts;
+ true ->
+ Conflicts = no_conflicts
+ end,
+ {lists:sort(Merged), Conflicts}.
-% When the same key is found in the trees, the value in tree B is discarded.
-merge([], B) ->
- B;
-merge(A, []) ->
- A;
-merge([ATree | ANextTree], [BTree | BNextTree]) ->
+merge_one([], Insert, OutAcc, ConflictsAcc) ->
+ {ok, [Insert | OutAcc], ConflictsAcc};
+merge_one([{Start, Tree}|Rest], {StartInsert, TreeInsert}, OutAcc, ConflictsAcc) ->
+ if Start =< StartInsert ->
+ StartA = Start,
+ StartB = StartInsert,
+ TreeA = Tree,
+ TreeB = TreeInsert;
+ true ->
+ StartB = Start,
+ StartA = StartInsert,
+ TreeB = Tree,
+ TreeA = TreeInsert
+ end,
+ case merge_at([TreeA], StartB - StartA, TreeB) of
+ {ok, [CombinedTrees], Conflicts} ->
+ merge_one(Rest, {StartA, CombinedTrees}, OutAcc, Conflicts or ConflictsAcc);
+ no ->
+ merge_one(Rest, {StartB, TreeB}, [{StartA, TreeA} | OutAcc], ConflictsAcc)
+ end.
+
+merge_at([], _Place, _Insert) ->
+ no;
+merge_at([{Key, Value, SubTree}|Sibs], 0, {InsertKey, InsertValue, InsertSubTree}) ->
+ if Key == InsertKey ->
+ {Merge, Conflicts} = merge_simple(SubTree, InsertSubTree),
+ {ok, [{Key, Value, Merge} | Sibs], Conflicts};
+ true ->
+ case merge_at(Sibs, 0, {InsertKey, InsertValue, InsertSubTree}) of
+ {ok, Merged, Conflicts} ->
+ {ok, [{Key, Value, SubTree} | Merged], Conflicts};
+ no ->
+ no
+ end
+ end;
+merge_at([{Key, Value, SubTree}|Sibs], Place, Insert) ->
+ case merge_at(SubTree, Place - 1,Insert) of
+ {ok, Merged, Conflicts} ->
+ {ok, [{Key, Value, Merged} | Sibs], Conflicts};
+ no ->
+ case merge_at(Sibs, Place, Insert) of
+ {ok, Merged} ->
+ [{Key, Value, SubTree} | Merged];
+ no ->
+ no
+ end
+ end.
+
+% key tree functions
+merge_simple([], B) ->
+ {B, false};
+merge_simple(A, []) ->
+ {A, false};
+merge_simple([ATree | ANextTree], [BTree | BNextTree]) ->
{AKey, AValue, ASubTree} = ATree,
{BKey, _BValue, BSubTree} = BTree,
if
AKey == BKey ->
%same key
- MergedSubTree = merge(ASubTree, BSubTree),
- MergedNextTree = merge(ANextTree, BNextTree),
- [{AKey, AValue, MergedSubTree} | MergedNextTree];
+ {MergedSubTree, Conflict1} = merge_simple(ASubTree, BSubTree),
+ {MergedNextTree, Conflict2} = merge_simple(ANextTree, BNextTree),
+ {[{AKey, AValue, MergedSubTree} | MergedNextTree], Conflict1 or Conflict2};
AKey < BKey ->
- [ATree | merge(ANextTree, [BTree | BNextTree])];
+ {MTree, _} = merge_simple(ANextTree, [BTree | BNextTree]),
+ {[ATree | MTree], true};
true ->
- [BTree | merge([ATree | ANextTree], BNextTree)]
+ {MTree, _} = merge_simple([ATree | ANextTree], BNextTree),
+ {[BTree | MTree], true}
end.
find_missing(_Tree, []) ->
[];
-find_missing([], Keys) ->
- Keys;
-find_missing([{Key, _, SubTree} | RestTree], Keys) ->
- SrcKeys2 = Keys -- [Key],
- SrcKeys3 = find_missing(SubTree, SrcKeys2),
- find_missing(RestTree, SrcKeys3).
+find_missing([], SeachKeys) ->
+ SeachKeys;
+find_missing([{Start, {Key, Value, SubTree}} | RestTree], SeachKeys) ->
+ PossibleKeys = [{KeyPos, KeyValue} || {KeyPos, KeyValue} <- SeachKeys, KeyPos >= Start],
+ ImpossibleKeys = [{KeyPos, KeyValue} || {KeyPos, KeyValue} <- SeachKeys, KeyPos < Start],
+ Missing = find_missing_simple(Start, [{Key, Value, SubTree}], PossibleKeys),
+ find_missing(RestTree, ImpossibleKeys ++ Missing).
+
+find_missing_simple(_Pos, _Tree, []) ->
+ [];
+find_missing_simple(_Pos, [], SeachKeys) ->
+ SeachKeys;
+find_missing_simple(Pos, [{Key, _, SubTree} | RestTree], SeachKeys) ->
+ PossibleKeys = [{KeyPos, KeyValue} || {KeyPos, KeyValue} <- SeachKeys, KeyPos >= Pos],
+ ImpossibleKeys = [{KeyPos, KeyValue} || {KeyPos, KeyValue} <- SeachKeys, KeyPos < Pos],
+
+ SrcKeys2 = PossibleKeys -- [{Pos, Key}],
+ SrcKeys3 = find_missing_simple(Pos + 1, SubTree, SrcKeys2),
+ ImpossibleKeys ++ find_missing_simple(Pos, RestTree, SrcKeys3).
-get_all_key_paths_rev([], KeyPathAcc) ->
- KeyPathAcc;
-get_all_key_paths_rev([{Key, Value, SubTree} | RestTree], KeyPathAcc) ->
- get_all_key_paths_rev(SubTree, [{Key, Value} | KeyPathAcc]) ++
- get_all_key_paths_rev(RestTree, KeyPathAcc).
-
+filter_leafs([], _Keys, FilteredAcc, RemovedKeysAcc) ->
+ {FilteredAcc, RemovedKeysAcc};
+filter_leafs([{Pos, [{LeafKey, _}|_]} = Path |Rest], Keys, FilteredAcc, RemovedKeysAcc) ->
+ FilteredKeys = lists:delete({Pos, LeafKey}, Keys),
+ if FilteredKeys == Keys ->
+ % this leaf is not a key we are looking to remove
+ filter_leafs(Rest, Keys, [Path | FilteredAcc], RemovedKeysAcc);
+ true ->
+ % this did match a key, remove both the node and the input key
+ filter_leafs(Rest, FilteredKeys, FilteredAcc, [{Pos, LeafKey} | RemovedKeysAcc])
+ end.
% Removes any branches from the tree whose leaf node(s) are in the Keys
-remove_leafs(Tree, Keys) ->
+remove_leafs(Trees, Keys) ->
% flatten each branch in a tree into a tree path
- Paths = get_all_key_paths_rev(Tree, []),
+ Paths = get_all_leafs_full(Trees),
% filter out any that are in the keys list.
- {FoundKeys, FilteredPaths} = lists:mapfoldl(
- fun(Key, PathsAcc) ->
- case [Path || [{LeafKey,_}|_]=Path <- PathsAcc, LeafKey /= Key] of
- PathsAcc ->
- {nil, PathsAcc};
- PathsAcc2 ->
- {Key, PathsAcc2}
- end
- end, Paths, Keys),
-
+ {FilteredPaths, RemovedKeys} = filter_leafs(Paths, Keys, [], []),
+
% convert paths back to trees
NewTree = lists:foldl(
- fun(Path,TreeAcc) ->
- SingleTree = lists:foldl(
+ fun({PathPos, Path},TreeAcc) ->
+ [SingleTree] = lists:foldl(
fun({K,V},NewTreeAcc) -> [{K,V,NewTreeAcc}] end, [], Path),
- merge(TreeAcc, SingleTree)
+ {NewTrees, _} = merge(TreeAcc, [{PathPos + 1 - length(Path), SingleTree}]),
+ NewTrees
end, [], FilteredPaths),
- {NewTree, FoundKeys}.
+ {NewTree, RemovedKeys}.
% get the leafs in the tree matching the keys. The matching key nodes can be
@@ -94,87 +175,211 @@
get_key_leafs(Tree, Keys) ->
get_key_leafs(Tree, Keys, []).
-get_key_leafs(_Tree, [], _KeyPathAcc) ->
+get_key_leafs(_, [], Acc) ->
+ {Acc, []};
+get_key_leafs([], Keys, Acc) ->
+ {Acc, Keys};
+get_key_leafs([{Pos, Tree}|Rest], Keys, Acc) ->
+ {Gotten, RemainingKeys} = get_key_leafs_simple(Pos, [Tree], Keys, []),
+ get_key_leafs(Rest, RemainingKeys, Gotten ++ Acc).
+
+get_key_leafs_simple(_Pos, _Tree, [], _KeyPathAcc) ->
{[], []};
-get_key_leafs([], KeysToGet, _KeyPathAcc) ->
+get_key_leafs_simple(_Pos, [], KeysToGet, _KeyPathAcc) ->
{[], KeysToGet};
-get_key_leafs([{Key, _Value, SubTree}=Tree | RestTree], KeysToGet, KeyPathAcc) ->
- case KeysToGet -- [Key] of
+get_key_leafs_simple(Pos, [{Key, _Value, SubTree}=Tree | RestTree], KeysToGet, KeyPathAcc) ->
+ case lists:delete({Pos, Key}, KeysToGet) of
KeysToGet -> % same list, key not found
- {LeafsFound, KeysToGet2} = get_key_leafs(SubTree, KeysToGet, [Key | KeyPathAcc]),
- {RestLeafsFound, KeysRemaining} = get_key_leafs(RestTree, KeysToGet2, KeyPathAcc),
+ {LeafsFound, KeysToGet2} = get_key_leafs_simple(Pos + 1, SubTree, KeysToGet, [Key | KeyPathAcc]),
+ {RestLeafsFound, KeysRemaining} = get_key_leafs_simple(Pos, RestTree, KeysToGet2, KeyPathAcc),
{LeafsFound ++ RestLeafsFound, KeysRemaining};
KeysToGet2 ->
- LeafsFound = get_all_leafs([Tree], KeyPathAcc),
+ LeafsFound = get_all_leafs_simple(Pos, [Tree], KeyPathAcc),
LeafKeysFound = [LeafKeyFound || {LeafKeyFound, _, _} <- LeafsFound],
KeysToGet2 = KeysToGet2 -- LeafKeysFound,
- {RestLeafsFound, KeysRemaining} = get_key_leafs(RestTree, KeysToGet2, KeyPathAcc),
+ {RestLeafsFound, KeysRemaining} = get_key_leafs_simple(Pos, RestTree, KeysToGet2, KeyPathAcc),
{LeafsFound ++ RestLeafsFound, KeysRemaining}
end.
get(Tree, KeysToGet) ->
{KeyPaths, KeysNotFound} = get_full_key_paths(Tree, KeysToGet),
- FixedResults = [ {Key, Value, [Key0 || {Key0, _} <- Path]} || [{Key, Value}|_] = Path <- KeyPaths],
+ FixedResults = [ {Value, {Pos, [Key0 || {Key0, _} <- Path]}} || {Pos, [{_Key, Value}|_]=Path} <- KeyPaths],
{FixedResults, KeysNotFound}.
get_full_key_paths(Tree, Keys) ->
get_full_key_paths(Tree, Keys, []).
-get_full_key_paths(_Tree, [], _KeyPathAcc) ->
+get_full_key_paths(_, [], Acc) ->
+ {Acc, []};
+get_full_key_paths([], Keys, Acc) ->
+ {Acc, Keys};
+get_full_key_paths([{Pos, Tree}|Rest], Keys, Acc) ->
+ {Gotten, RemainingKeys} = get_full_key_paths(Pos, [Tree], Keys, []),
+ get_full_key_paths(Rest, RemainingKeys, Gotten ++ Acc).
+
+
+get_full_key_paths(_Pos, _Tree, [], _KeyPathAcc) ->
{[], []};
-get_full_key_paths([], KeysToGet, _KeyPathAcc) ->
+get_full_key_paths(_Pos, [], KeysToGet, _KeyPathAcc) ->
{[], KeysToGet};
-get_full_key_paths([{KeyId, Value, SubTree} | RestTree], KeysToGet, KeyPathAcc) ->
- KeysToGet2 = KeysToGet -- [KeyId],
+get_full_key_paths(Pos, [{KeyId, Value, SubTree} | RestTree], KeysToGet, KeyPathAcc) ->
+ KeysToGet2 = KeysToGet -- [{Pos, KeyId}],
CurrentNodeResult =
case length(KeysToGet2) == length(KeysToGet) of
true -> % not in the key list.
[];
false -> % this node is the key list. return it
- [[{KeyId, Value} | KeyPathAcc]]
+ [{Pos, [{KeyId, Value} | KeyPathAcc]}]
end,
- {KeysGotten, KeysRemaining} = get_full_key_paths(SubTree, KeysToGet2, [{KeyId, Value} | KeyPathAcc]),
- {KeysGotten2, KeysRemaining2} = get_full_key_paths(RestTree, KeysRemaining, KeyPathAcc),
+ {KeysGotten, KeysRemaining} = get_full_key_paths(Pos + 1, SubTree, KeysToGet2, [{KeyId, Value} | KeyPathAcc]),
+ {KeysGotten2, KeysRemaining2} = get_full_key_paths(Pos, RestTree, KeysRemaining, KeyPathAcc),
{CurrentNodeResult ++ KeysGotten ++ KeysGotten2, KeysRemaining2}.
get_all_leafs_full(Tree) ->
get_all_leafs_full(Tree, []).
-get_all_leafs_full([], _KeyPathAcc) ->
+get_all_leafs_full([], Acc) ->
+ Acc;
+get_all_leafs_full([{Pos, Tree} | Rest], Acc) ->
+ get_all_leafs_full(Rest, get_all_leafs_full_simple(Pos, [Tree], []) ++ Acc).
+
+get_all_leafs_full_simple(_Pos, [], _KeyPathAcc) ->
[];
-get_all_leafs_full([{KeyId, Value, []} | RestTree], KeyPathAcc) ->
- [[{KeyId, Value} | KeyPathAcc] | get_all_leafs_full(RestTree, KeyPathAcc)];
-get_all_leafs_full([{KeyId, Value, SubTree} | RestTree], KeyPathAcc) ->
- get_all_leafs_full(SubTree, [{KeyId, Value} | KeyPathAcc]) ++ get_all_leafs_full(RestTree, KeyPathAcc).
-
-get_all_leafs(Tree) ->
- get_all_leafs(Tree, []).
+get_all_leafs_full_simple(Pos, [{KeyId, Value, []} | RestTree], KeyPathAcc) ->
+ [{Pos, [{KeyId, Value} | KeyPathAcc]} | get_all_leafs_full_simple(Pos, RestTree, KeyPathAcc)];
+get_all_leafs_full_simple(Pos, [{KeyId, Value, SubTree} | RestTree], KeyPathAcc) ->
+ get_all_leafs_full_simple(Pos + 1, SubTree, [{KeyId, Value} | KeyPathAcc]) ++ get_all_leafs_full_simple(Pos, RestTree, KeyPathAcc).
+
+get_all_leafs(Trees) ->
+ get_all_leafs(Trees, []).
+
+get_all_leafs([], Acc) ->
+ Acc;
+get_all_leafs([{Pos, Tree}|Rest], Acc) ->
+ get_all_leafs(Rest, get_all_leafs_simple(Pos, [Tree], []) ++ Acc).
-get_all_leafs([], _KeyPathAcc) ->
+get_all_leafs_simple(_Pos, [], _KeyPathAcc) ->
[];
-get_all_leafs([{KeyId, Value, []} | RestTree], KeyPathAcc) ->
- [{KeyId, Value, [KeyId | KeyPathAcc]} | get_all_leafs(RestTree, KeyPathAcc)];
-get_all_leafs([{KeyId, _Value, SubTree} | RestTree], KeyPathAcc) ->
- get_all_leafs(SubTree, [KeyId | KeyPathAcc]) ++ get_all_leafs(RestTree, KeyPathAcc).
+get_all_leafs_simple(Pos, [{KeyId, Value, []} | RestTree], KeyPathAcc) ->
+ [{Value, {Pos, [KeyId | KeyPathAcc]}} | get_all_leafs_simple(Pos, RestTree, KeyPathAcc)];
+get_all_leafs_simple(Pos, [{KeyId, _Value, SubTree} | RestTree], KeyPathAcc) ->
+ get_all_leafs_simple(Pos + 1, SubTree, [KeyId | KeyPathAcc]) ++ get_all_leafs_simple(Pos, RestTree, KeyPathAcc).
+
-get_leaf_keys([]) ->
- [];
-get_leaf_keys([{Key, _Value, []} | RestTree]) ->
- [Key | get_leaf_keys(RestTree)];
-get_leaf_keys([{_Key, _Value, SubTree} | RestTree]) ->
- get_leaf_keys(SubTree) ++ get_leaf_keys(RestTree).
-
count_leafs([]) ->
0;
-count_leafs([{_Key, _Value, []} | RestTree]) ->
- 1 + count_leafs(RestTree);
-count_leafs([{_Key, _Value, SubTree} | RestTree]) ->
- count_leafs(SubTree) + count_leafs(RestTree).
+count_leafs([{_Pos,Tree}|Rest]) ->
+ count_leafs_simple([Tree]) + count_leafs(Rest).
+count_leafs_simple([]) ->
+ 0;
+count_leafs_simple([{_Key, _Value, []} | RestTree]) ->
+ 1 + count_leafs_simple(RestTree);
+count_leafs_simple([{_Key, _Value, SubTree} | RestTree]) ->
+ count_leafs_simple(SubTree) + count_leafs_simple(RestTree).
+
map(_Fun, []) ->
[];
-map(Fun, [{Key, Value, SubTree} | RestTree]) ->
- Value2 = Fun(Key, Value),
- [{Key, Value2, map(Fun, SubTree)} | map(Fun, RestTree)].
+map(Fun, [{Pos, Tree}|Rest]) ->
+ [NewTree] = map_simple(Fun, Pos, [Tree]),
+ [{Pos, NewTree} | map(Fun, Rest)].
+map_simple(_Fun, _Pos, []) ->
+ [];
+map_simple(Fun, Pos, [{Key, Value, SubTree} | RestTree]) ->
+ Value2 = Fun({Pos, Key}, Value),
+ [{Key, Value2, map_simple(Fun, Pos + 1, SubTree)} | map_simple(Fun, Pos, RestTree)].
+
+
+stem(Trees, Limit) ->
+ % flatten each branch in a tree into a tree path
+ Paths = get_all_leafs_full(Trees),
+
+ Paths2 = [{Pos, lists:sublist(Path, Limit)} || {Pos, Path} <- Paths],
+
+ % convert paths back to trees
+ lists:foldl(
+ fun({PathPos, Path},TreeAcc) ->
+ [SingleTree] = lists:foldl(
+ fun({K,V},NewTreeAcc) -> [{K,V,NewTreeAcc}] end, [], Path),
+ {NewTrees, _} = merge(TreeAcc, [{PathPos + 1 - length(Path), SingleTree}]),
+ NewTrees
+ end, [], Paths2).
+
+test() ->
+ EmptyTree = [],
+ One = [{0, {"1","foo",[]}}],
+ TwoSibs = [{0, {"1","foo",[]}},
+ {0, {"2","foo",[]}}],
+ OneChild = [{0, {"1","foo",[{"1a", "bar", []}]}}],
+ TwoChild = [{0, {"1","foo", [{"1a", "bar", [{"1aa", "bar", []}]}]}}],
+ TwoChildSibs = [{0, {"1","foo", [{"1a", "bar", []},
+ {"1b", "bar", []}]}}],
+ Stemmed1a = [{1, {"1a", "bar", [{"1aa", "bar", []}]}}],
+ Stemmed1aa = [{2, {"1aa", "bar", []}}],
+
+ {EmptyTree, no_conflicts} = merge(EmptyTree, EmptyTree),
+ {One, no_conflicts} = merge(EmptyTree, One),
+ {One, no_conflicts} = merge(One, EmptyTree),
+ {TwoSibs, no_conflicts} = merge(One, TwoSibs),
+ {One, no_conflicts} = merge(One, One),
+ {TwoChild, no_conflicts} = merge(TwoChild, TwoChild),
+ {TwoChildSibs, no_conflicts} = merge(TwoChildSibs, TwoChildSibs),
+ {TwoChild, no_conflicts} = merge(TwoChild, Stemmed1aa),
+ {TwoChild, no_conflicts} = merge(TwoChild, Stemmed1a),
+ {Stemmed1a, no_conflicts} = merge(Stemmed1a, Stemmed1aa),
+ Expect1 = OneChild ++ Stemmed1aa,
+ {Expect1, conflicts} = merge(OneChild, Stemmed1aa),
+ {TwoChild, no_conflicts} = merge(Expect1, TwoChild),
+
+ []=find_missing(TwoChildSibs, [{0,"1"}, {1,"1a"}]),
+ [{0, "10"}, {100, "x"}]=find_missing(TwoChildSibs, [{0,"1"}, {0, "10"}, {1,"1a"}, {100, "x"}]),
+ [{0, "1"}, {100, "x"}]=find_missing(Stemmed1a, [{0,"1"}, {1,"1a"}, {100, "x"}]),
+ [{0, "1"}, {1,"1a"}, {100, "x"}]=find_missing(Stemmed1aa, [{0,"1"}, {1,"1a"}, {100, "x"}]),
+
+ {TwoChildSibs, []} = remove_leafs(TwoChildSibs, []),
+ {TwoChildSibs, []} = remove_leafs(TwoChildSibs, [{0, "1"}]),
+ {OneChild, [{1, "1b"}]} = remove_leafs(TwoChildSibs, [{1, "1b"}]),
+ {[], [{1, "1b"},{1, "1a"}]} = remove_leafs(TwoChildSibs, [{1, "1a"}, {1, "1b"}]),
+ {Stemmed1a, []} = remove_leafs(Stemmed1a, [{1, "1a"}]),
+ {[], [{2, "1aa"}]} = remove_leafs(Stemmed1a, [{2, "1aa"}]),
+ {TwoChildSibs, []} = remove_leafs(TwoChildSibs, []),
+
+ {[],[{0,"x"}]} = get_key_leafs(TwoChildSibs, [{0, "x"}]),
+
+ {[{"bar", {1, ["1a","1"]}}],[]} = get_key_leafs(TwoChildSibs, [{1, "1a"}]),
+ {[{"bar", {1, ["1a","1"]}},{"bar",{1, ["1b","1"]}}],[]} = get_key_leafs(TwoChildSibs, [{0, "1"}]),
+
+ {[{"foo", {0, ["1"]}}],[]} = get(TwoChildSibs, [{0, "1"}]),
+ {[{"bar", {1, ["1a", "1"]}}],[]} = get(TwoChildSibs, [{1, "1a"}]),
+
+ {[{0,[{"1", "foo"}]}],[]} = get_full_key_paths(TwoChildSibs, [{0, "1"}]),
+ {[{1,[{"1a", "bar"},{"1", "foo"}]}],[]} = get_full_key_paths(TwoChildSibs, [{1, "1a"}]),
+
+ [{2, [{"1aa", "bar"},{"1a", "bar"}]}] = get_all_leafs_full(Stemmed1a),
+ [{1, [{"1a", "bar"},{"1", "foo"}]}, {1, [{"1b", "bar"},{"1", "foo"}]}] = get_all_leafs_full(TwoChildSibs),
+
+ [{"bar", {2, ["1aa","1a"]}}] = get_all_leafs(Stemmed1a),
+ [{"bar", {1, ["1a", "1"]}}, {"bar", {1, ["1b","1"]}}] = get_all_leafs(TwoChildSibs),
+
+ 0 = count_leafs(EmptyTree),
+ 1 = count_leafs(One),
+ 2 = count_leafs(TwoChildSibs),
+ 1 = count_leafs(Stemmed1a),
+
+ TwoChild = stem(TwoChild, 3),
+ Stemmed1a = stem(TwoChild, 2),
+ Stemmed1aa = stem(TwoChild, 1),
+ ok.
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
Modified: couchdb/trunk/src/couchdb/couch_rep.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_rep.erl?rev=753448&r1=753447&r2=753448&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_rep.erl (original)
+++ couchdb/trunk/src/couchdb/couch_rep.erl Fri Mar 13 22:15:34 2009
@@ -15,11 +15,11 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
--export([replicate/3]).
+-export([replicate/2]).
-include_lib("couch_db.hrl").
-%% @spec replicate(Source::binary(), Target::binary(), Options::proplist()) ->
+%% @spec replicate(Source::binary(), Target::binary()) ->
%% {ok, Stats} | {error, Reason}
%% @doc Triggers a replication. Stats is a JSON Object with the following
%% keys: session_id (UUID), source_last_seq (integer), and history (array).
@@ -30,26 +30,29 @@
%% The supervisor will try to restart the replication in case of any error
%% other than shutdown. Just call this function again to listen for the
%% result of the retry.
-replicate(Source, Target, Options) ->
- Id = <<Source/binary, ":", Target/binary>>,
- Args = [?MODULE, [Source,Target,Options], []],
+replicate(Source, Target) ->
- Replicator = {Id,
+ {ok, HostName} = inet:gethostname(),
+ RepId = couch_util:to_hex(
+ erlang:md5(term_to_binary([HostName, Source, Target]))),
+ Args = [?MODULE, [RepId, Source,Target], []],
+
+ Replicator = {RepId,
{gen_server, start_link, Args},
transient,
- 10000,
+ 1,
worker,
[?MODULE]
},
Server = case supervisor:start_child(couch_rep_sup, Replicator) of
{ok, Pid} ->
- ?LOG_INFO("starting new replication ~p at ~p", [Id, Pid]),
+ ?LOG_INFO("starting new replication ~p at ~p", [RepId, Pid]),
Pid;
{error, already_present} ->
- case supervisor:restart_child(couch_rep_sup, Id) of
+ case supervisor:restart_child(couch_rep_sup, RepId) of
{ok, Pid} ->
- ?LOG_INFO("starting replication ~p at ~p", [Id, Pid]),
+ ?LOG_INFO("starting replication ~p at ~p", [RepId, Pid]),
Pid;
{error, running} ->
%% this error occurs if multiple replicators are racing
@@ -57,16 +60,16 @@
%% the Pid by calling start_child again.
{error, {already_started, Pid}} =
supervisor:start_child(couch_rep_sup, Replicator),
- ?LOG_INFO("replication ~p already running at ~p", [Id, Pid]),
+ ?LOG_INFO("replication ~p already running at ~p", [RepId, Pid]),
Pid
end;
{error, {already_started, Pid}} ->
- ?LOG_INFO("replication ~p already running at ~p", [Id, Pid]),
+ ?LOG_INFO("replication ~p already running at ~p", [RepId, Pid]),
Pid
end,
case gen_server:call(Server, get_result, infinity) of
- retry -> replicate(Source, Target, Options);
+ retry -> replicate(Source, Target);
Else -> Else
end.
@@ -79,6 +82,7 @@
headers
}).
+
-record(state, {
context,
current_seq,
@@ -90,18 +94,14 @@
listeners = []
}).
-init([Source, Target, Options]) ->
+
+init([RepId, Source, Target]) ->
process_flag(trap_exit, true),
- {ok, DbSrc} =
- open_db(Source, proplists:get_value(source_options, Options, [])),
- {ok, DbTgt} =
- open_db(Target, proplists:get_value(target_options, Options, [])),
-
- {ok, Host} = inet:gethostname(),
- HostBin = list_to_binary(Host),
- DocKey = <<?LOCAL_DOC_PREFIX, HostBin/binary, ":", Source/binary, ":",
- Target/binary>>,
+ {ok, DbSrc, SrcName} = open_db(Source),
+ {ok, DbTgt, TgtName} = open_db(Target),
+
+ DocKey = ?l2b(?LOCAL_DOC_PREFIX ++ RepId),
{ok, InfoSrc} = get_db_info(DbSrc),
{ok, InfoTgt} = get_db_info(DbTgt),
@@ -110,49 +110,49 @@
SrcInstanceStartTime = proplists:get_value(instance_start_time, InfoSrc),
TgtInstanceStartTime = proplists:get_value(instance_start_time, InfoTgt),
- case proplists:get_value(full, Options, false)
- orelse proplists:get_value("full", Options, false) of
+ RepRecDocSrc =
+ case open_doc(DbSrc, DocKey, []) of
+ {ok, SrcDoc} ->
+ ?LOG_DEBUG("Found existing replication record on source", []),
+ SrcDoc;
+ _ -> #doc{id=DocKey}
+ end,
+
+ RepRecDocTgt =
+ case open_doc(DbTgt, DocKey, []) of
+ {ok, TgtDoc} ->
+ ?LOG_DEBUG("Found existing replication record on target", []),
+ TgtDoc;
+ _ -> #doc{id=DocKey}
+ end,
+
+ #doc{body={RepRecProps}} = RepRecDocSrc,
+ #doc{body={RepRecPropsTgt}} = RepRecDocTgt,
+
+ case proplists:get_value(<<"session_id">>, RepRecProps) ==
+ proplists:get_value(<<"session_id">>, RepRecPropsTgt) of
true ->
- RepRecSrc = RepRecTgt = #doc{id=DocKey};
+ % if the records have the same session id,
+ % then we have a valid replication history
+ OldSeqNum = proplists:get_value(<<"source_last_seq">>, RepRecProps, 0),
+ OldHistory = proplists:get_value(<<"history">>, RepRecProps, []);
false ->
- RepRecSrc = case open_doc(DbSrc, DocKey, []) of
- {ok, SrcDoc} ->
- ?LOG_DEBUG("Found existing replication record on source", []),
- SrcDoc;
- _ -> #doc{id=DocKey}
- end,
-
- RepRecTgt = case open_doc(DbTgt, DocKey, []) of
- {ok, TgtDoc} ->
- ?LOG_DEBUG("Found existing replication record on target", []),
- TgtDoc;
- _ -> #doc{id=DocKey}
- end
- end,
-
- #doc{body={OldRepHistoryProps}} = RepRecSrc,
- #doc{body={OldRepHistoryPropsTrg}} = RepRecTgt,
-
- SeqNum = case OldRepHistoryProps == OldRepHistoryPropsTrg of
- true ->
- % if the records are identical, then we have a valid replication history
- proplists:get_value(<<"source_last_seq">>, OldRepHistoryProps, 0);
- false ->
- ?LOG_INFO("Replication records differ. "
+ ?LOG_INFO("Replication records differ. "
"Performing full replication instead of incremental.", []),
- ?LOG_DEBUG("Record on source:~p~nRecord on target:~p~n",
- [OldRepHistoryProps, OldRepHistoryPropsTrg]),
- 0
+ ?LOG_DEBUG("Record on source:~p~nRecord on target:~p~n",
+ [RepRecProps, RepRecPropsTgt]),
+ OldSeqNum = 0,
+ OldHistory = []
end,
Context = [
- {start_seq, SeqNum},
- {history, OldRepHistoryProps},
+ {start_seq, OldSeqNum},
+ {history, OldHistory},
{rep_starttime, ReplicationStartTime},
{src_starttime, SrcInstanceStartTime},
{tgt_starttime, TgtInstanceStartTime},
- {src_record, RepRecSrc},
- {tgt_record, RepRecTgt}
+ {src_record, RepRecDocSrc},
+ {tgt_record, RepRecDocTgt}
],
Stats = ets:new(replication_stats, [set, private]),
@@ -160,16 +160,17 @@
ets:insert(Stats, {missing_revs, 0}),
ets:insert(Stats, {docs_read, 0}),
ets:insert(Stats, {docs_written, 0}),
+ ets:insert(Stats, {doc_write_failures, 0}),
- couch_task_status:add_task("Replication", <<Source/binary, " -> ",
- Target/binary>>, "Starting"),
+ couch_task_status:add_task("Replication", <<SrcName/binary, " -> ",
+ TgtName/binary>>, "Starting"),
Parent = self(),
- Pid = spawn_link(fun() -> enum_docs_since(Parent,DbSrc,DbTgt,{SeqNum,0}) end),
+ Pid = spawn_link(fun() -> enum_docs_since(Parent,DbSrc,DbTgt,{OldSeqNum,0}) end),
State = #state{
context = Context,
- current_seq = SeqNum,
+ current_seq = OldSeqNum,
enum_pid = Pid,
source = DbSrc,
target = DbTgt,
@@ -178,7 +179,6 @@
{ok, State}.
-
handle_call(get_result, From, #state{listeners=L} = State) ->
{noreply, State#state{listeners=[From|L]}};
@@ -191,7 +191,7 @@
target = Target,
stats = Stats
} = State,
-
+
ets:update_counter(Stats, missing_revs, length(Revs)),
%% get document(s)
@@ -203,8 +203,11 @@
{NewBuffer, NewContext} = case couch_util:should_flush() of
true ->
Docs2 = lists:flatten([Docs|Buffer]),
- ok = update_docs(Target, Docs2, [], false),
- ets:update_counter(Stats, docs_written, length(Docs2)),
+ {ok, Errors} = update_docs(Target, Docs2, [], replicated_changes),
+ dump_update_errors(Errors),
+ ets:update_counter(Stats, doc_write_failures, length(Errors)),
+ ets:update_counter(Stats, docs_written, length(Docs2) -
+ length(Errors)),
{ok, _, Ctxt} = do_checkpoint(Source, Target, Context, Seq, Stats),
{[], Ctxt};
false ->
@@ -255,8 +258,11 @@
stats = Stats
} = State,
- ok = update_docs(Target, lists:flatten(Buffer), [], false),
- ets:update_counter(Stats, docs_written, lists:flatlength(Buffer)),
+ {ok, Errors} = update_docs(Target, lists:flatten(Buffer), [], replicated_changes),
+ dump_update_errors(Errors),
+ ets:update_counter(Stats, doc_write_failures, length(Errors)),
+ ets:update_counter(Stats, docs_written, lists:flatlength(Buffer) -
+ length(Errors)),
couch_task_status:update("Finishing"),
@@ -264,9 +270,12 @@
ets:delete(Stats),
close_db(Target),
- %% reply to original requester
- [Original|Rest] = Listeners,
- gen_server:reply(Original, {ok, NewRepHistory}),
+ case Listeners of
+ [Original|Rest] ->
+ %% reply to original requester
+ gen_server:reply(Original, {ok, NewRepHistory});
+ Rest -> ok
+ end,
%% maybe trigger another replication. If this replicator uses a local
%% source Db, changes to that Db since we started will not be included in
@@ -304,6 +313,16 @@
%% internal functions
%%=============================================================================
+
+% we should probably write these to a special replication log
+% or have a callback where the caller decides what to do with replication
+% errors.
+dump_update_errors([]) -> ok;
+dump_update_errors([{{Id, Rev}, Error}|Rest]) ->
+ ?LOG_INFO("error replicating document \"~s\" rev \"~s\":~p",
+ [Id, couch_doc:rev_to_str(Rev), Error]),
+ dump_update_errors(Rest).
+
attachment_loop(ReqId) ->
couch_util:should_flush(),
receive
@@ -354,6 +373,16 @@
end,
{Name, {Type, {RcvFun, Length}}}.
+
+open_db({remote, Url, Headers})->
+ {ok, #http_db{uri=?b2l(Url), headers=Headers}, Url};
+open_db({local, DbName, UserCtx})->
+ case couch_db:open(DbName, [{user_ctx, UserCtx}]) of
+ {ok, Db} -> {ok, Db, DbName};
+ Error -> Error
+ end.
+
+
close_db(#http_db{})->
ok;
close_db(Db)->
@@ -362,27 +391,38 @@
do_checkpoint(Source, Target, Context, NewSeqNum, Stats) ->
?LOG_INFO("recording a checkpoint at source update_seq ~p", [NewSeqNum]),
[
- {start_seq, SeqNum},
- {history, OldRepHistoryProps},
+ {start_seq, StartSeqNum},
+ {history, OldHistory},
{rep_starttime, ReplicationStartTime},
{src_starttime, SrcInstanceStartTime},
{tgt_starttime, TgtInstanceStartTime},
- {src_record, RepRecSrc},
- {tgt_record, RepRecTgt}
+ {src_record, #doc{body={LastRepRecord}}=RepRecDocSrc},
+ {tgt_record, RepRecDocTgt}
] = Context,
- NewHistory = case NewSeqNum == SeqNum andalso OldRepHistoryProps /= [] of
+ case NewSeqNum == StartSeqNum andalso OldHistory /= [] of
true ->
% nothing changed, don't record results
- {OldRepHistoryProps};
+ {ok, {[{<<"no_changes">>, true} | LastRepRecord]}, Context};
false ->
+ % something changed, record results for incremental replication,
+
% commit changes to both src and tgt. The src because if changes
- % we replicated are lost, we'll record the a seq number of ahead
- % of what was committed and therefore lose future changes with the
- % same seq nums.
- {ok, SrcInstanceStartTime2} = ensure_full_commit(Source),
+ % we replicated are lost, we'll record the a seq number ahead
+ % of what was committed. If those changes are lost and the seq number
+ % reverts to a previous committed value, we will skip future changes
+ % when new doc updates are given our already replicated seq nums.
+
+ % commit the src async
+ ParentPid = self(),
+ SrcCommitPid = spawn_link(fun() ->
+ ParentPid ! {self(), ensure_full_commit(Source)} end),
+
+ % commit tgt sync
{ok, TgtInstanceStartTime2} = ensure_full_commit(Target),
+ receive {SrcCommitPid, {ok, SrcInstanceStartTime2}} -> ok end,
+
RecordSeqNum =
if SrcInstanceStartTime2 == SrcInstanceStartTime andalso
TgtInstanceStartTime2 == TgtInstanceStartTime ->
@@ -391,60 +431,57 @@
?LOG_INFO("A server has restarted sinced replication start. "
"Not recording the new sequence number to ensure the "
"replication is redone and documents reexamined.", []),
- SeqNum
+ StartSeqNum
end,
- %% format replication history
- JsonStats = [
+ NewHistoryEntry = {
+ [{<<"start_time">>, list_to_binary(ReplicationStartTime)},
+ {<<"end_time">>, list_to_binary(httpd_util:rfc1123_date())},
+ {<<"start_last_seq">>, StartSeqNum},
+ {<<"end_last_seq">>, NewSeqNum},
{<<"missing_checked">>, ets:lookup_element(Stats, total_revs, 2)},
{<<"missing_found">>, ets:lookup_element(Stats, missing_revs, 2)},
{<<"docs_read">>, ets:lookup_element(Stats, docs_read, 2)},
- {<<"docs_written">>, ets:lookup_element(Stats, docs_written, 2)}
+ {<<"docs_written">>, ets:lookup_element(Stats, docs_written, 2)},
+ {<<"doc_write_failures">>, ets:lookup_element(Stats, doc_write_failures, 2)}
+ ]},
+ % limit history to 50 entries
+ HistEntries =lists:sublist([NewHistoryEntry | OldHistory], 50),
+
+ NewRepHistory =
+ {[{<<"session_id">>, couch_util:new_uuid()},
+ {<<"source_last_seq">>, RecordSeqNum},
+ {<<"history">>, HistEntries}]},
+
+ {ok, {SrcRevPos,SrcRevId}} = update_doc(Source,
+ RepRecDocSrc#doc{body=NewRepHistory}, []),
+ {ok, {TgtRevPos,TgtRevId}} = update_doc(Target,
+ RepRecDocTgt#doc{body=NewRepHistory}, []),
+
+ NewContext = [
+ {start_seq, StartSeqNum},
+ {history, OldHistory},
+ {rep_starttime, ReplicationStartTime},
+ {src_starttime, SrcInstanceStartTime},
+ {tgt_starttime, TgtInstanceStartTime},
+ {src_record, RepRecDocSrc#doc{revs={SrcRevPos,[SrcRevId]}}},
+ {tgt_record, RepRecDocTgt#doc{revs={TgtRevPos,[TgtRevId]}}}
],
-
- HistEntries =[
- {
- [{<<"start_time">>, list_to_binary(ReplicationStartTime)},
- {<<"end_time">>, list_to_binary(httpd_util:rfc1123_date())},
- {<<"start_last_seq">>, SeqNum},
- {<<"end_last_seq">>, NewSeqNum} | JsonStats]}
- | proplists:get_value(<<"history">>, OldRepHistoryProps, [])],
- % something changed, record results
- {[
- {<<"session_id">>, couch_util:new_uuid()},
- {<<"source_last_seq">>, RecordSeqNum},
- {<<"history">>, lists:sublist(HistEntries, 50)}
- ]}
- end,
-
- %% update local documents
- RepRecSrc = proplists:get_value(src_record, Context),
- RepRecTgt = proplists:get_value(tgt_record, Context),
- {ok, TgtRev} = update_local_doc(Target, RepRecTgt#doc{body=NewHistory}, []),
- {ok, SrcRev} = update_local_doc(Source, RepRecSrc#doc{body=NewHistory}, []),
-
- NewContext = [
- {start_seq, SeqNum},
- {history, OldRepHistoryProps},
- {rep_starttime, ReplicationStartTime},
- {src_starttime, SrcInstanceStartTime},
- {tgt_starttime, TgtInstanceStartTime},
- {src_record, RepRecSrc#doc{revs=[SrcRev]}},
- {tgt_record, RepRecTgt#doc{revs=[TgtRev]}}
- ],
- {ok, NewHistory, NewContext}.
+ {ok, NewRepHistory, NewContext}
+
+ end.
do_http_request(Url, Action, Headers) ->
do_http_request(Url, Action, Headers, []).
do_http_request(Url, Action, Headers, JsonBody) ->
- do_http_request(?b2l(?l2b(Url)), Action, Headers, JsonBody, 10).
+ do_http_request(Url, Action, Headers, JsonBody, 10).
do_http_request(Url, Action, _Headers, _JsonBody, 0) ->
?LOG_ERROR("couch_rep HTTP ~p request failed after 10 retries: ~s",
[Action, Url]),
- exit({http_request_failed, ?l2b(Url)});
+ exit({http_request_failed, Url});
do_http_request(Url, Action, Headers, JsonBody, Retries) ->
?LOG_DEBUG("couch_rep HTTP ~p request: ~s", [Action, Url]),
Body =
@@ -498,7 +535,6 @@
[] ->
gen_server:call(Pid, {fin, {StartSeq, RevsCount}}, infinity);
DocInfoList ->
- % UpdateSeqs = [D#doc_info.update_seq || D <- DocInfoList],
SrcRevsList = lists:map(fun(SrcDocInfo) ->
#doc_info{id=Id,
rev=Rev,
@@ -521,13 +557,8 @@
enum_docs_since(Pid, DbSource, DbTarget, {LastSeq, RevsCount2})
end.
-fix_url(UrlBin) ->
- Url = binary_to_list(UrlBin),
- case lists:last(Url) of
- $/ -> Url;
- _ -> Url ++ "/"
- end.
+
get_db_info(#http_db{uri=DbUrl, headers=Headers}) ->
{DbProps} = do_http_request(DbUrl, get, Headers),
{ok, [{list_to_existing_atom(?b2l(K)), V} || {K,V} <- DbProps]};
@@ -542,12 +573,12 @@
{RowValueProps} = proplists:get_value(<<"value">>, RowInfoList),
#doc_info{
id=proplists:get_value(<<"id">>, RowInfoList),
- rev=proplists:get_value(<<"rev">>, RowValueProps),
+ rev=couch_doc:parse_rev(proplists:get_value(<<"rev">>, RowValueProps)),
update_seq = proplists:get_value(<<"key">>, RowInfoList),
conflict_revs =
- proplists:get_value(<<"conflicts">>, RowValueProps, []),
+ couch_doc:parse_revs(proplists:get_value(<<"conflicts">>, RowValueProps, [])),
deleted_conflict_revs =
- proplists:get_value(<<"deleted_conflicts">>, RowValueProps, []),
+ couch_doc:parse_revs(proplists:get_value(<<"deleted_conflicts">>, RowValueProps, [])),
deleted = proplists:get_value(<<"deleted">>, RowValueProps, false)
}
end, proplists:get_value(<<"rows">>, Results));
@@ -561,25 +592,18 @@
lists:reverse(DocInfoList).
get_missing_revs(#http_db{uri=DbUrl, headers=Headers}, DocIdRevsList) ->
+ DocIdRevsList2 = [{Id, couch_doc:rev_to_strs(Revs)} || {Id, Revs} <- DocIdRevsList],
{ResponseMembers} = do_http_request(DbUrl ++ "_missing_revs", post, Headers,
- {DocIdRevsList}),
- {MissingRevs} = proplists:get_value(<<"missing_revs">>, ResponseMembers),
- {ok, MissingRevs};
+ {DocIdRevsList2}),
+ {DocMissingRevsList} = proplists:get_value(<<"missing_revs">>, ResponseMembers),
+ DocMissingRevsList2 = [{Id, couch_doc:parse_revs(MissingRevStrs)} || {Id, MissingRevStrs} <- DocMissingRevsList],
+ {ok, DocMissingRevsList2};
get_missing_revs(Db, DocId) ->
couch_db:get_missing_revs(Db, DocId).
-open_http_db(UrlBin, Options) ->
- Headers = proplists:get_value(headers, Options, {[]}),
- {ok, #http_db{uri=fix_url(UrlBin), headers=Headers}}.
-
-open_db(<<"http://", _/binary>>=Url, Options)->
- open_http_db(Url, Options);
-open_db(<<"https://", _/binary>>=Url, Options)->
- open_http_db(Url, Options);
-open_db(DbName, Options)->
- couch_db:open(DbName, Options).
-open_doc(#http_db{uri=DbUrl, headers=Headers}, DocId, []) ->
+open_doc(#http_db{uri=DbUrl, headers=Headers}, DocId, Options) ->
+ [] = Options,
case do_http_request(DbUrl ++ url_encode(DocId), get, Headers) of
{[{<<"error">>, ErrId}, {<<"reason">>, Reason}]} ->
{couch_util:to_existing_atom(ErrId), Reason};
@@ -589,7 +613,9 @@
open_doc(Db, DocId, Options) ->
couch_db:open_doc(Db, DocId, Options).
-open_doc_revs(#http_db{uri=DbUrl, headers=Headers} = DbS, DocId, Revs, _Opts) ->
+open_doc_revs(#http_db{uri=DbUrl, headers=Headers} = DbS, DocId, Revs0,
+ [latest]) ->
+ Revs = couch_doc:rev_to_strs(Revs0),
BaseUrl = DbUrl ++ url_encode(DocId) ++ "?revs=true&latest=true",
%% MochiWeb expects URLs < 8KB long, so maybe split into multiple requests
@@ -612,39 +638,52 @@
lists:flatten(?JSON_ENCODE(lists:reverse(Rest))), get, Headers)
end,
- Results =
- lists:map(fun({[{<<"missing">>, Rev}]}) ->
- {{not_found, missing}, Rev};
- ({[{<<"ok">>, JsonDoc}]}) ->
+ Results =
+ lists:map(
+ fun({[{<<"missing">>, Rev}]}) ->
+ {{not_found, missing}, couch_doc:parse_rev(Rev)};
+ ({[{<<"ok">>, JsonDoc}]}) ->
#doc{id=Id, attachments=Attach} = Doc = couch_doc:from_json_obj(JsonDoc),
Attach2 = [attachment_stub_converter(DbS,Id,A) || A <- Attach],
{ok, Doc#doc{attachments=Attach2}}
- end, JsonResults),
+ end, JsonResults),
{ok, Results};
open_doc_revs(Db, DocId, Revs, Options) ->
couch_db:open_doc_revs(Db, DocId, Revs, Options).
-update_docs(_, [], _, _) ->
- ok;
-update_docs(#http_db{uri=DbUrl, headers=Headers}, Docs, [], NewEdits) ->
- JsonDocs = [couch_doc:to_json_obj(Doc, [revs,attachments]) || Doc <- Docs],
- {Returned} =
- do_http_request(DbUrl ++ "_bulk_docs", post, Headers,
- {[{new_edits, NewEdits}, {docs, JsonDocs}]}),
- true = proplists:get_value(<<"ok">>, Returned),
- ok;
-update_docs(Db, Docs, Options, NewEdits) ->
- couch_db:update_docs(Db, Docs, Options, NewEdits).
-update_local_doc(#http_db{uri=DbUrl, headers=Headers}, #doc{id=DocId}=Doc, []) ->
+update_doc(#http_db{uri=DbUrl, headers=Headers}, #doc{id=DocId}=Doc, Options) ->
+ [] = Options,
Url = DbUrl ++ url_encode(DocId),
{ResponseMembers} = do_http_request(Url, put, Headers,
- couch_doc:to_json_obj(Doc, [revs,attachments])),
- RevId = proplists:get_value(<<"rev">>, ResponseMembers),
- {ok, RevId};
-update_local_doc(Db, Doc, Options) ->
+ couch_doc:to_json_obj(Doc, [attachments])),
+ Rev = proplists:get_value(<<"rev">>, ResponseMembers),
+ {ok, couch_doc:parse_rev(Rev)};
+update_doc(Db, Doc, Options) ->
couch_db:update_doc(Db, Doc, Options).
+update_docs(_, [], _, _) ->
+ {ok, []};
+update_docs(#http_db{uri=DbUrl, headers=Headers}, Docs, [], replicated_changes) ->
+ JsonDocs = [couch_doc:to_json_obj(Doc, [revs,attachments]) || Doc <- Docs],
+ ErrorsJson =
+ do_http_request(DbUrl ++ "_bulk_docs", post, Headers,
+ {[{new_edits, false}, {docs, JsonDocs}]}),
+ ErrorsList =
+ lists:map(
+ fun({Props}) ->
+ Id = proplists:get_value(<<"id">>, Props),
+ Rev = couch_doc:parse_rev(proplists:get_value(<<"rev">>, Props)),
+ ErrId = couch_util:to_existing_atom(
+ proplists:get_value(<<"error">>, Props)),
+ Reason = proplists:get_value(<<"reason">>, Props),
+ Error = {ErrId, Reason},
+ {{Id, Rev}, Error}
+ end, ErrorsJson),
+ {ok, ErrorsList};
+update_docs(Db, Docs, Options, UpdateType) ->
+ couch_db:update_docs(Db, Docs, Options, UpdateType).
+
up_to_date(#http_db{}, _Seq) ->
true;
up_to_date(Source, Seq) ->
Modified: couchdb/trunk/src/couchdb/couch_util.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_util.erl?rev=753448&r1=753447&r2=753448&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_util.erl (original)
+++ couchdb/trunk/src/couchdb/couch_util.erl Fri Mar 13 22:15:34 2009
@@ -13,7 +13,7 @@
-module(couch_util).
-export([start_driver/1]).
--export([should_flush/0, should_flush/1, to_existing_atom/1]).
+-export([should_flush/0, should_flush/1, to_existing_atom/1, to_binary/1]).
-export([new_uuid/0, rand32/0, implode/2, collate/2, collate/3]).
-export([abs_pathname/1,abs_pathname/2, trim/1, ascii_lower/1]).
-export([encodeBase64/1, decodeBase64/1, to_hex/1,parse_term/1,dict_find/3]).
@@ -57,6 +57,19 @@
to_digit(N) -> $a + N-10.
+to_binary(V) when is_binary(V) ->
+ V;
+to_binary(V) when is_list(V) ->
+ try list_to_binary(V)
+ catch
+ _ -> list_to_binary(io_lib:format("~p", [V]))
+ end;
+to_binary(V) when is_atom(V) ->
+ list_to_binary(atom_to_list(V));
+to_binary(V) ->
+ list_to_binary(io_lib:format("~p", [V])).
+
+
parse_term(Bin) when is_binary(Bin)->
parse_term(binary_to_list(Bin));
parse_term(List) ->