You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by cm...@apache.org on 2008/03/29 00:32:30 UTC
svn commit: r642432 [15/16] - in /incubator/couchdb/trunk: ./ bin/
build-contrib/ etc/ etc/conf/ etc/default/ etc/init/ etc/launchd/
etc/logrotate.d/ share/ share/server/ share/www/ share/www/browse/
share/www/image/ share/www/script/ share/www/style/ ...
Added: incubator/couchdb/trunk/src/couchdb/couch_event_sup.erl
URL: http://svn.apache.org/viewvc/incubator/couchdb/trunk/src/couchdb/couch_event_sup.erl?rev=642432&view=auto
==============================================================================
--- incubator/couchdb/trunk/src/couchdb/couch_event_sup.erl (added)
+++ incubator/couchdb/trunk/src/couchdb/couch_event_sup.erl Fri Mar 28 16:32:19 2008
@@ -0,0 +1,69 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+%% The purpose of this module is to allow event handlers to particpate in Erlang
+%% supervisor trees. It provide a monitorable process that crashes if the event
+%% handler fails. The process, when shutdown, deregisters the event handler.
+
+-module(couch_event_sup).
+-behaviour(gen_server).
+
+-include("couch_db.hrl").
+
+-export([start_link/3,start_link/4, stop/1]).
+-export([init/1, terminate/2, handle_call/3, handle_cast/2, handle_info/2,code_change/3]).
+
+%
+% Instead calling the
+% ok = gen_event:add_sup_handler(error_logger, my_log, Args)
+%
+% do this:
+% {ok, LinkedPid} = couch_event_sup:start_link(error_logger, my_log, Args)
+%
+% The benefit is the event is now part of the process tree, and can be
+% started, restarted and shutdown consistently like the rest of the server
+% components.
+%
+% And now if the "event" crashes, the supervisor is notified and can restart
+% the event handler.
+%
+% Use this form to named process:
+% {ok, LinkedPid} = couch_event_sup:start_link({local, my_log}, error_logger, my_log, Args)
+%
+
+start_link(EventMgr, EventHandler, Args) ->
+ gen_server:start_link(couch_event_sup, {EventMgr, EventHandler, Args}, []).
+
+start_link(ServerName, EventMgr, EventHandler, Args) ->
+ gen_server:start_link(ServerName, couch_event_sup, {EventMgr, EventHandler, Args}, []).
+
+stop(Pid) ->
+ gen_server:cast(Pid, stop).
+
+init({EventMgr, EventHandler, Args}) ->
+ ok = gen_event:add_sup_handler(EventMgr, EventHandler, Args),
+ {ok, {EventMgr, EventHandler}}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+handle_call(_Whatever, _From, State) ->
+ {ok, State}.
+
+handle_cast(stop, State) ->
+ {stop, normal, State}.
+
+handle_info({gen_event_EXIT, _Handler, Reason}, State) ->
+ {stop, Reason, State}.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
Added: incubator/couchdb/trunk/src/couchdb/couch_file.erl
URL: http://svn.apache.org/viewvc/incubator/couchdb/trunk/src/couchdb/couch_file.erl?rev=642432&view=auto
==============================================================================
--- incubator/couchdb/trunk/src/couchdb/couch_file.erl (added)
+++ incubator/couchdb/trunk/src/couchdb/couch_file.erl Fri Mar 28 16:32:19 2008
@@ -0,0 +1,323 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_file).
+-behaviour(gen_server).
+
+-define(HEADER_SIZE, 2048). % size of each segment of the doubly written header
+
+-export([open/1, open/2, close/1, pread/3, pwrite/3, expand/2, bytes/1, sync/1]).
+-export([append_term/2, pread_term/2,write_header/3, read_header/2, truncate/2]).
+-export([init/1, terminate/2, handle_call/3, handle_cast/2, code_change/3, handle_info/2]).
+
+%%----------------------------------------------------------------------
+%% Args: Valid Options are [create] and [create,overwrite].
+%% Files are opened in read/write mode.
+%% Returns: On success, {ok, Fd}
+%% or {error, Reason} if the file could not be opened.
+%%----------------------------------------------------------------------
+
+open(Filepath) ->
+ open(Filepath, []).
+
+open(Filepath, Options) ->
+ case gen_server:start_link(couch_file, {Filepath, Options, self()}, []) of
+ {ok, FdPid} ->
+ % we got back an ok, but that doesn't really mean it was successful.
+ % Instead the true status has been sent back to us as a message.
+ % We do this because if the gen_server doesn't initialize properly,
+ % it generates a crash report that will get logged. This avoids
+ % that mess, because we don't want crash reports generated
+ % every time a file cannot be found.
+ receive
+ {FdPid, ok} ->
+ {ok, FdPid};
+ {FdPid, Error} ->
+ Error
+ end;
+ Error ->
+ Error
+ end.
+
+
+%%----------------------------------------------------------------------
+%% Args: Pos is the offset from the beginning of the file, Bytes is
+%% is the number of bytes to read.
+%% Returns: {ok, Binary} where Binary is a binary data from disk
+%% or {error, Reason}.
+%%----------------------------------------------------------------------
+
+pread(Fd, Pos, Bytes) when Bytes > 0 ->
+ gen_server:call(Fd, {pread, Pos, Bytes}).
+
+
+%%----------------------------------------------------------------------
+%% Args: Pos is the offset from the beginning of the file, Bin is
+%% is the binary to write
+%% Returns: ok
+%% or {error, Reason}.
+%%----------------------------------------------------------------------
+
+pwrite(Fd, Pos, Bin) ->
+ gen_server:call(Fd, {pwrite, Pos, Bin}).
+
+%%----------------------------------------------------------------------
+%% Purpose: To append a segment of zeros to the end of the file.
+%% Args: Bytes is the number of bytes to append to the file.
+%% Returns: {ok, Pos} where Pos is the file offset to the beginning of
+%% the new segments.
+%% or {error, Reason}.
+%%----------------------------------------------------------------------
+
+expand(Fd, Bytes) when Bytes > 0 ->
+ gen_server:call(Fd, {expand, Bytes}).
+
+
+%%----------------------------------------------------------------------
+%% Purpose: To append an Erlang term to the end of the file.
+%% Args: Erlang term to serialize and append to the file.
+%% Returns: {ok, Pos} where Pos is the file offset to the beginning the
+%% serialized term. Use pread_term to read the term back.
+%% or {error, Reason}.
+%%----------------------------------------------------------------------
+
+append_term(Fd, Term) ->
+ gen_server:call(Fd, {append_term, Term}).
+
+
+%%----------------------------------------------------------------------
+%% Purpose: Reads a term from a file that was written with append_term
+%% Args: Pos, the offset into the file where the term is serialized.
+%% Returns: {ok, Term}
+%% or {error, Reason}.
+%%----------------------------------------------------------------------
+
+pread_term(Fd, Pos) ->
+ gen_server:call(Fd, {pread_term, Pos}).
+
+
+%%----------------------------------------------------------------------
+%% Purpose: The length of a file, in bytes.
+%% Returns: {ok, Bytes}
+%% or {error, Reason}.
+%%----------------------------------------------------------------------
+
+% length in bytes
+bytes(Fd) ->
+ gen_server:call(Fd, bytes).
+
+%%----------------------------------------------------------------------
+%% Purpose: Truncate a file to the number of bytes.
+%% Returns: ok
+%% or {error, Reason}.
+%%----------------------------------------------------------------------
+
+truncate(Fd, Pos) ->
+ gen_server:call(Fd, {truncate, Pos}).
+
+%%----------------------------------------------------------------------
+%% Purpose: Ensure all bytes written to the file are flushed to disk.
+%% Returns: ok
+%% or {error, Reason}.
+%%----------------------------------------------------------------------
+
+sync(Fd) ->
+ gen_server:call(Fd, sync).
+
+%%----------------------------------------------------------------------
+%% Purpose: Close the file. Is performed asynchronously.
+%% Returns: ok
+%%----------------------------------------------------------------------
+close(Fd) ->
+ gen_server:cast(Fd, close).
+
+
+write_header(Fd, Prefix, Data) ->
+ % The leading bytes in every db file, the sig and the file version:
+ %the actual header data
+ TermBin = term_to_binary(Data),
+ % the size of all the bytes written to the header, including the md5 signature (16 bytes)
+ FilledSize = size(Prefix) + size(TermBin) + 16,
+ case FilledSize > ?HEADER_SIZE of
+ true ->
+ % too big!
+ {error, error_header_too_large};
+ false ->
+ % pad out the header with zeros, then take the md5 hash
+ PadZeros = <<0:(8*(?HEADER_SIZE - FilledSize))>>,
+ Sig = erlang:md5([TermBin, PadZeros]),
+ % now we assemble the final header binary and write to disk
+ WriteBin = <<Prefix/binary, TermBin/binary, PadZeros/binary, Sig/binary>>,
+ ?HEADER_SIZE = size(WriteBin), % sanity check
+ DblWriteBin = [WriteBin, WriteBin],
+ ok = pwrite(Fd, 0, DblWriteBin)
+ end.
+
+
+read_header(Fd, Prefix) ->
+ {ok, Bin} = couch_file:pread(Fd, 0, 2*(?HEADER_SIZE)),
+ <<Bin1:(?HEADER_SIZE)/binary, Bin2:(?HEADER_SIZE)/binary>> = Bin,
+ % read the first header
+ case extract_header(Prefix, Bin1) of
+ {ok, Header1} ->
+ case extract_header(Prefix, Bin2) of
+ {ok, Header2} ->
+ case Header1 == Header2 of
+ true ->
+ % Everything is completely normal!
+ {ok, Header1};
+ false ->
+ % To get here we must have two different header versions with signatures intact.
+ % It's weird but possible (a commit failure right at the 2k boundary). Log it and take the first.
+ couch_log:info("Header version differences.~nPrimary Header: ~p~nSecondary Header: ~p", [Header1, Header2]),
+ {ok, Header1}
+ end;
+ {error, Error} ->
+ % error reading second header. It's ok, but log it.
+ couch_log:info("Secondary header corruption (error: ~p). Using primary header.", [Error]),
+ {ok, Header1}
+ end;
+ {error, Error} ->
+ % error reading primary header
+ case extract_header(Prefix, Bin2) of
+ {ok, Header2} ->
+ % log corrupt primary header. It's ok since the secondary is still good.
+ couch_log:info("Primary header corruption (error: ~p). Using secondary header.", [Error]),
+ {ok, Header2};
+ _ ->
+ % error reading secondary header too
+ % return the error, no need to log anything as the caller will be responsible for dealing with the error.
+ {error, Error}
+ end
+ end.
+
+
+extract_header(Prefix, Bin) ->
+ SizeOfPrefix = size(Prefix),
+ SizeOfTermBin = ?HEADER_SIZE -
+ SizeOfPrefix -
+ 16, % md5 sig
+
+ <<HeaderPrefix:SizeOfPrefix/binary, TermBin:SizeOfTermBin/binary, Sig:16/binary>> = Bin,
+
+ % check the header prefix
+ case HeaderPrefix of
+ Prefix ->
+ % check the integrity signature
+ case erlang:md5(TermBin) == Sig of
+ true ->
+ Header = binary_to_term(TermBin),
+ {ok, Header};
+ false ->
+ {error, header_corrupt}
+ end;
+ _ ->
+ {error, unknown_header_type}
+ end.
+
+
+
+init_status_ok(ReturnPid, Fd) ->
+ ReturnPid ! {self(), ok}, % signal back ok
+ {ok, Fd}.
+
+init_status_error(ReturnPid, Error) ->
+ ReturnPid ! {self(), Error}, % signal back error status
+ self() ! self_close, % tell ourself to close async
+ {ok, nil}.
+
+% server functions
+
+init({Filepath, Options, ReturnPid}) ->
+ case lists:member(create, Options) of
+ true ->
+ filelib:ensure_dir(Filepath),
+ case file:open(Filepath, [read, write, raw, binary]) of
+ {ok, Fd} ->
+ {ok, Length} = file:position(Fd, eof),
+ case Length > 0 of
+ true ->
+ % this means the file already exists and has data.
+ % FYI: We don't differentiate between empty files and non-existant
+ % files here.
+ case lists:member(overwrite, Options) of
+ true ->
+ {ok, 0} = file:position(Fd, 0),
+ ok = file:truncate(Fd),
+ init_status_ok(ReturnPid, Fd);
+ false ->
+ ok = file:close(Fd),
+ init_status_error(ReturnPid, {error, file_exists})
+ end;
+ false ->
+ init_status_ok(ReturnPid, Fd)
+ end;
+ Error ->
+ init_status_error(ReturnPid, Error)
+ end;
+ false ->
+ % open in read mode first, so we don't create the file if it doesn't exist.
+ case file:open(Filepath, [read, raw]) of
+ {ok, Fd_Read} ->
+ {ok, Fd} = file:open(Filepath, [read, write, raw, binary]),
+ ok = file:close(Fd_Read),
+ init_status_ok(ReturnPid, Fd);
+ Error ->
+ init_status_error(ReturnPid, Error)
+ end
+ end.
+
+
+terminate(_Reason, nil) ->
+ ok;
+terminate(_Reason, Fd) ->
+ file:close(Fd),
+ ok.
+
+
+handle_call({pread, Pos, Bytes}, _From, Fd) ->
+ {reply, file:pread(Fd, Pos, Bytes), Fd};
+handle_call({pwrite, Pos, Bin}, _From, Fd) ->
+ {reply, file:pwrite(Fd, Pos, Bin), Fd};
+handle_call({expand, Num}, _From, Fd) ->
+ {ok, Pos} = file:position(Fd, eof),
+ {reply, {file:pwrite(Fd, Pos + Num - 1, <<0>>), Pos}, Fd};
+handle_call(bytes, _From, Fd) ->
+ {reply, file:position(Fd, eof), Fd};
+handle_call(sync, _From, Fd) ->
+ {reply, file:sync(Fd), Fd};
+handle_call({truncate, Pos}, _From, Fd) ->
+ {ok, Pos} = file:position(Fd, Pos),
+ {reply, file:truncate(Fd), Fd};
+handle_call({append_term, Term}, _From, Fd) ->
+ Bin = term_to_binary(Term, [compressed]),
+ TermLen = size(Bin),
+ Bin2 = <<TermLen:32, Bin/binary>>,
+ {ok, Pos} = file:position(Fd, eof),
+ {reply, {file:pwrite(Fd, Pos, Bin2), Pos}, Fd};
+handle_call({pread_term, Pos}, _From, Fd) ->
+ {ok, <<TermLen:32>>}
+ = file:pread(Fd, Pos, 4),
+ {ok, Bin} = file:pread(Fd, Pos + 4, TermLen),
+ {reply, {ok, binary_to_term(Bin)}, Fd}.
+
+
+handle_cast(close, Fd) ->
+ {stop,normal,Fd}. % causes terminate to be called
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+handle_info(self_close, State) ->
+ {stop,normal,State};
+handle_info(_Info, State) ->
+ {noreply, State}.
Added: incubator/couchdb/trunk/src/couchdb/couch_ft_query.erl
URL: http://svn.apache.org/viewvc/incubator/couchdb/trunk/src/couchdb/couch_ft_query.erl?rev=642432&view=auto
==============================================================================
--- incubator/couchdb/trunk/src/couchdb/couch_ft_query.erl (added)
+++ incubator/couchdb/trunk/src/couchdb/couch_ft_query.erl Fri Mar 28 16:32:19 2008
@@ -0,0 +1,78 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_ft_query).
+-behaviour(gen_server).
+
+-export([start_link/1, execute/2]).
+
+-export([init/1, terminate/2, handle_call/3, handle_cast/2, handle_info/2,code_change/3, stop/0]).
+
+-define(ERR_HANDLE, {Port, {exit_status, Status}} -> {stop, {unknown_error, Status}, {unknown_error, Status}, Port}).
+
+start_link(QueryExec) ->
+ gen_server:start_link({local, couch_ft_query}, couch_ft_query, QueryExec, []).
+
+stop() ->
+ exit(whereis(couch_ft_query), close).
+
+execute(DatabaseName, QueryString) ->
+ gen_server:call(couch_ft_query, {ft_query, DatabaseName, QueryString}).
+
+init(QueryExec) ->
+ Port = open_port({spawn, QueryExec}, [{line, 1000}, exit_status, hide]),
+ {ok, Port}.
+
+terminate(_Reason, _Server) ->
+ ok.
+
+handle_call({ft_query, Database, QueryText}, _From, Port) ->
+ %% send the database name
+ true = port_command(Port, Database ++ "\n"),
+ true = port_command(Port, QueryText ++ "\n"),
+ case get_line(Port) of
+ "ok" ->
+ DocIds = read_query_results(Port, []),
+ {reply, {ok, DocIds}, Port};
+ "error" ->
+ ErrorId = get_line(Port),
+ ErrorMsg = get_line(Port),
+ {reply, {list_to_atom(ErrorId), ErrorMsg}, Port}
+ end.
+
+read_query_results(Port, Acc) ->
+ case get_line(Port) of
+ "" -> % line by itself means all done
+ lists:reverse(Acc);
+ DocId ->
+ Score = get_line(Port),
+ read_query_results(Port, [{DocId, Score} | Acc])
+ end.
+
+
+get_line(Port) ->
+ receive
+ {Port, {data, {eol, Line}}} ->
+ Line;
+ ?ERR_HANDLE
+ end.
+
+handle_cast(_Whatever, State) ->
+ {noreply, State}.
+
+handle_info({Port, {exit_status, Status}}, Port) ->
+ {stop, {os_process_exited, Status}, Port};
+handle_info(_Whatever, State) ->
+ {noreply, State}.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
Added: incubator/couchdb/trunk/src/couchdb/couch_js.c
URL: http://svn.apache.org/viewvc/incubator/couchdb/trunk/src/couchdb/couch_js.c?rev=642432&view=auto
==============================================================================
--- incubator/couchdb/trunk/src/couchdb/couch_js.c (added)
+++ incubator/couchdb/trunk/src/couchdb/couch_js.c Fri Mar 28 16:32:19 2008
@@ -0,0 +1,452 @@
+/*
+
+Licensed under the Apache License, Version 2.0 (the "License"); you may not use
+this file except in compliance with the License. You may obtain a copy of the
+License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software distributed
+under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+CONDITIONS OF ANY KIND, either express or implied. See the License for the
+specific language governing permissions and limitations under the License.
+
+*/
+
+#include <stdio.h>
+#include <jsapi.h>
+
+int gExitCode = 0;
+int gStackChunkSize = 8L * 1024L;
+
+int
+EncodeChar(uint8 *utf8Buffer, uint32 ucs4Char) {
+ int utf8Length = 1;
+
+ if (ucs4Char < 0x80) {
+ *utf8Buffer = (uint8)ucs4Char;
+ } else {
+ int i;
+ uint32 a = ucs4Char >> 11;
+ utf8Length = 2;
+ while (a) {
+ a >>= 5;
+ utf8Length++;
+ }
+ i = utf8Length;
+ while (--i) {
+ utf8Buffer[i] = (uint8)((ucs4Char & 0x3F) | 0x80);
+ ucs4Char >>= 6;
+ }
+ *utf8Buffer = (uint8)(0x100 - (1 << (8-utf8Length)) + ucs4Char);
+ }
+ return utf8Length;
+}
+
+JSBool
+EncodeString(const jschar *src, size_t srclen, char *dst, size_t *dstlenp) {
+ size_t i, utf8Len, dstlen = *dstlenp, origDstlen = dstlen;
+ jschar c, c2;
+ uint32 v;
+ uint8 utf8buf[6];
+
+ if (!dst)
+ dstlen = origDstlen = (size_t) -1;
+
+ while (srclen) {
+ c = *src++;
+ srclen--;
+ if ((c >= 0xDC00) && (c <= 0xDFFF))
+ goto badSurrogate;
+ if (c < 0xD800 || c > 0xDBFF) {
+ v = c;
+ } else {
+ if (srclen < 1)
+ goto bufferTooSmall;
+ c2 = *src++;
+ srclen--;
+ if ((c2 < 0xDC00) || (c2 > 0xDFFF)) {
+ c = c2;
+ goto badSurrogate;
+ }
+ v = ((c - 0xD800) << 10) + (c2 - 0xDC00) + 0x10000;
+ }
+ if (v < 0x0080) {
+ /* no encoding necessary - performance hack */
+ if (!dstlen)
+ goto bufferTooSmall;
+ if (dst)
+ *dst++ = (char) v;
+ utf8Len = 1;
+ } else {
+ utf8Len = EncodeChar(utf8buf, v);
+ if (utf8Len > dstlen)
+ goto bufferTooSmall;
+ if (dst) {
+ for (i = 0; i < utf8Len; i++)
+ *dst++ = (char) utf8buf[i];
+ }
+ }
+ dstlen -= utf8Len;
+ }
+ *dstlenp = (origDstlen - dstlen);
+ return JS_TRUE;
+
+badSurrogate:
+ *dstlenp = (origDstlen - dstlen);
+ return JS_FALSE;
+
+bufferTooSmall:
+ *dstlenp = (origDstlen - dstlen);
+ return JS_FALSE;
+}
+
+static uint32
+DecodeChar(const uint8 *utf8Buffer, int utf8Length) {
+ uint32 ucs4Char;
+ uint32 minucs4Char;
+ /* from Unicode 3.1, non-shortest form is illegal */
+ static const uint32 minucs4Table[] = {
+ 0x00000080, 0x00000800, 0x0001000, 0x0020000, 0x0400000
+ };
+
+ if (utf8Length == 1) {
+ ucs4Char = *utf8Buffer;
+ } else {
+ ucs4Char = *utf8Buffer++ & ((1<<(7-utf8Length))-1);
+ minucs4Char = minucs4Table[utf8Length-2];
+ while (--utf8Length) {
+ ucs4Char = ucs4Char<<6 | (*utf8Buffer++ & 0x3F);
+ }
+ if (ucs4Char < minucs4Char ||
+ ucs4Char == 0xFFFE || ucs4Char == 0xFFFF) {
+ ucs4Char = 0xFFFD;
+ }
+ }
+ return ucs4Char;
+}
+
+JSBool
+DecodeString(const char *src, size_t srclen, jschar *dst, size_t *dstlenp) {
+ uint32 v;
+ size_t offset = 0, j, n, dstlen = *dstlenp, origDstlen = dstlen;
+
+ if (!dst)
+ dstlen = origDstlen = (size_t) -1;
+
+ while (srclen) {
+ v = (uint8) *src;
+ n = 1;
+ if (v & 0x80) {
+ while (v & (0x80 >> n))
+ n++;
+ if (n > srclen)
+ goto bufferTooSmall;
+ if (n == 1 || n > 6)
+ goto badCharacter;
+ for (j = 1; j < n; j++) {
+ if ((src[j] & 0xC0) != 0x80)
+ goto badCharacter;
+ }
+ v = DecodeChar((const uint8 *) src, n);
+ if (v >= 0x10000) {
+ v -= 0x10000;
+ if (v > 0xFFFFF || dstlen < 2) {
+ *dstlenp = (origDstlen - dstlen);
+ return JS_FALSE;
+ }
+ if (dstlen < 2)
+ goto bufferTooSmall;
+ if (dst) {
+ *dst++ = (jschar)((v >> 10) + 0xD800);
+ v = (jschar)((v & 0x3FF) + 0xDC00);
+ }
+ dstlen--;
+ }
+ }
+ if (!dstlen)
+ goto bufferTooSmall;
+ if (dst)
+ *dst++ = (jschar) v;
+ dstlen--;
+ offset += n;
+ src += n;
+ srclen -= n;
+ }
+ *dstlenp = (origDstlen - dstlen);
+ return JS_TRUE;
+
+badCharacter:
+ *dstlenp = (origDstlen - dstlen);
+ return JS_FALSE;
+
+bufferTooSmall:
+ *dstlenp = (origDstlen - dstlen);
+ return JS_FALSE;
+}
+
+static JSBool
+EvalInContext(JSContext *context, JSObject *obj, uintN argc, jsval *argv,
+ jsval *rval) {
+ JSString *str;
+ JSObject *sandbox;
+ JSContext *sub_context;
+ const jschar *src;
+ size_t srclen;
+ JSBool ok;
+ jsval v;
+
+ sandbox = NULL;
+ if (!JS_ConvertArguments(context, argc, argv, "S / o", &str, &sandbox))
+ return JS_FALSE;
+
+ sub_context = JS_NewContext(JS_GetRuntime(context), gStackChunkSize);
+ if (!sub_context) {
+ JS_ReportOutOfMemory(context);
+ return JS_FALSE;
+ }
+
+ src = JS_GetStringChars(str);
+ srclen = JS_GetStringLength(str);
+
+ if (!sandbox) {
+ sandbox = JS_NewObject(sub_context, NULL, NULL, NULL);
+ if (!sandbox || !JS_InitStandardClasses(sub_context, sandbox)) {
+ ok = JS_FALSE;
+ goto out;
+ }
+ }
+
+ if (srclen == 0) {
+ *rval = OBJECT_TO_JSVAL(sandbox);
+ ok = JS_TRUE;
+ } else {
+ ok = JS_EvaluateUCScript(sub_context, sandbox, src, srclen, NULL, -1,
+ rval);
+ }
+
+out:
+ JS_DestroyContext(sub_context);
+ return ok;
+}
+
+static JSBool
+GC(JSContext *context, JSObject *obj, uintN argc, jsval *argv, jsval *rval) {
+ JS_GC(context);
+ return JS_TRUE;
+}
+
+static JSBool
+Print(JSContext *context, JSObject *obj, uintN argc, jsval *argv, jsval *rval) {
+ uintN i, n;
+ size_t cl, bl;
+ JSString *str;
+ jschar *chars;
+ char *bytes;
+
+ for (i = n = 0; i < argc; i++) {
+ str = JS_ValueToString(context, argv[i]);
+ if (!str)
+ return JS_FALSE;
+ chars = JS_GetStringChars(str);
+ cl = JS_GetStringLength(str);
+ if (!EncodeString(chars, cl, NULL, &bl))
+ return JS_FALSE;
+ bytes = JS_malloc(context, bl + 1);
+ bytes[bl] = '\0';
+ if (!EncodeString(chars, cl, bytes, &bl)) {
+ JS_free(context, bytes);
+ return JS_FALSE;
+ }
+ fprintf(stdout, "%s%s", i ? " " : "", bytes);
+ JS_free(context, bytes);
+ }
+ n++;
+ if (n)
+ fputc('\n', stdout);
+ fflush(stdout);
+ return JS_TRUE;
+}
+
+static JSBool
+Quit(JSContext *context, JSObject *obj, uintN argc, jsval *argv, jsval *rval) {
+ JS_ConvertArguments(context, argc, argv, "/ i", &gExitCode);
+ return JS_FALSE;
+}
+
+static JSBool
+ReadLine(JSContext *context, JSObject *obj, uintN argc, jsval *argv, jsval *rval) {
+ char *bytes, *tmp;
+ jschar *chars;
+ size_t bufsize, byteslen, charslen, readlen;
+ JSString *str;
+
+ JS_MaybeGC(context);
+
+ byteslen = 0;
+ bufsize = 256;
+ bytes = JS_malloc(context, bufsize);
+ if (!bytes)
+ return JS_FALSE;
+
+ while ((readlen = js_fgets(bytes + byteslen, bufsize - byteslen, stdin)) > 0) {
+ byteslen += readlen;
+
+ /* Are we done? */
+ if (bytes[byteslen - 1] == '\n') {
+ bytes[byteslen - 1] = '\0';
+ break;
+ }
+
+ /* Else, grow our buffer for another pass */
+ tmp = JS_realloc(context, bytes, bufsize * 2);
+ if (!tmp) {
+ JS_free(context, bytes);
+ return JS_FALSE;
+ }
+
+ bufsize *= 2;
+ bytes = tmp;
+ }
+
+ /* Treat the empty string specially */
+ if (byteslen == 0) {
+ *rval = JS_GetEmptyStringValue(context);
+ JS_free(context, bytes);
+ return JS_TRUE;
+ }
+
+ /* Shrink the buffer to the real size */
+ tmp = JS_realloc(context, bytes, byteslen);
+ if (!tmp) {
+ JS_free(context, bytes);
+ return JS_FALSE;
+ }
+ bytes = tmp;
+
+ /* Decode the string from UTF-8 */
+ if (!DecodeString(bytes, byteslen, NULL, &charslen)) {
+ JS_free(context, bytes);
+ return JS_FALSE;
+ }
+ chars = JS_malloc(context, (charslen + 1) * sizeof(jschar));
+ if (!DecodeString(bytes, byteslen, chars, &charslen)) {
+ JS_free(context, bytes);
+ JS_free(context, chars);
+ return JS_FALSE;
+ }
+ chars[charslen] = '\0';
+
+ /* Initialize a JSString object */
+ str = JS_NewUCString(context, chars, charslen - 1);
+ if (!str) {
+ JS_free(context, bytes);
+ JS_free(context, chars);
+ return JS_FALSE;
+ }
+
+ *rval = STRING_TO_JSVAL(str);
+ return JS_TRUE;
+}
+
+static JSBool
+Seal(JSContext *context, JSObject *obj, uintN argc, jsval *argv, jsval *rval) {
+ JSObject *target;
+ JSBool deep = JS_FALSE;
+
+ if (!JS_ConvertArguments(context, argc, argv, "o/b", &target, &deep))
+ return JS_FALSE;
+ if (!target)
+ return JS_TRUE;
+ return JS_SealObject(context, target, deep);
+}
+
+static void
+ExecuteScript(JSContext *context, JSObject *obj, const char *filename) {
+ FILE *file;
+ JSScript *script;
+ jsval result;
+
+ if (!filename || strcmp(filename, "-") == 0) {
+ file = stdin;
+ } else {
+ file = fopen(filename, "r");
+ if (!file) {
+ fprintf(stderr, "could not open script file %s\n", filename);
+ gExitCode = 1;
+ return;
+ }
+ }
+
+ script = JS_CompileFileHandle(context, obj, filename, file);
+ if (script) {
+ JS_ExecuteScript(context, obj, script, &result);
+ JS_DestroyScript(context, script);
+ }
+}
+
+static uint32 gBranchCount = 0;
+static uint32 gBranchLimit = 100 * 1024;
+
+static JSBool
+BranchCallback(JSContext *context, JSScript *script) {
+ if (++gBranchCount == gBranchLimit) {
+ gBranchCount = 0;
+ return JS_FALSE;
+ }
+ if ((gBranchCount & 0x3fff) == 1) {
+ JS_MaybeGC(context);
+ }
+ return JS_TRUE;
+}
+
+static void
+PrintError(JSContext *context, const char *message, JSErrorReport *report) {
+ if (!report || !JSREPORT_IS_WARNING(report->flags))
+ fprintf(stderr, "%s\n", message);
+}
+
+int
+main(int argc, const char * argv[]) {
+ JSRuntime *runtime;
+ JSContext *context;
+ JSObject *global;
+
+ runtime = JS_NewRuntime(64L * 1024L * 1024L);
+ if (!runtime)
+ return 1;
+ context = JS_NewContext(runtime, gStackChunkSize);
+ if (!context)
+ return 1;
+ JS_SetErrorReporter(context, PrintError);
+ JS_SetBranchCallback(context, BranchCallback);
+ JS_ToggleOptions(context, JSOPTION_NATIVE_BRANCH_CALLBACK);
+ JS_ToggleOptions(context, JSOPTION_XML);
+
+ global = JS_NewObject(context, NULL, NULL, NULL);
+ if (!global)
+ return 1;
+ if (!JS_InitStandardClasses(context, global))
+ return 1;
+ if (!JS_DefineFunction(context, global, "evalcx", EvalInContext, 0, 0)
+ || !JS_DefineFunction(context, global, "gc", GC, 0, 0)
+ || !JS_DefineFunction(context, global, "print", Print, 0, 0)
+ || !JS_DefineFunction(context, global, "quit", Quit, 0, 0)
+ || !JS_DefineFunction(context, global, "readline", ReadLine, 0, 0)
+ || !JS_DefineFunction(context, global, "seal", Seal, 0, 0))
+ return 1;
+
+ if (argc != 2) {
+ fprintf(stderr, "incorrect number of arguments\n\n");
+ fprintf(stderr, "usage: %s <scriptfile>\n", argv[0]);
+ return 2;
+ }
+
+ ExecuteScript(context, global, argv[1]);
+
+ JS_DestroyContext(context);
+ JS_DestroyRuntime(runtime);
+ JS_ShutDown();
+
+ return gExitCode;
+}
Propchange: incubator/couchdb/trunk/src/couchdb/couch_js.c
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/couchdb/trunk/src/couchdb/couch_key_tree.erl
URL: http://svn.apache.org/viewvc/incubator/couchdb/trunk/src/couchdb/couch_key_tree.erl?rev=642432&view=auto
==============================================================================
--- incubator/couchdb/trunk/src/couchdb/couch_key_tree.erl (added)
+++ incubator/couchdb/trunk/src/couchdb/couch_key_tree.erl Fri Mar 28 16:32:19 2008
@@ -0,0 +1,139 @@
+% Copyright 2007, 2008 Damien Katz <da...@yahoo.com>
+%
+% Licensed under the Apache License, Version 2.0 (the "License");
+% you may not use this file except in compliance with the License.
+% You may obtain a copy of the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS,
+% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+% See the License for the specific language governing permissions and
+% limitations under the License.
+
+-module(couch_key_tree).
+
+-export([merge/2, find_missing/2, get_key_leafs/2, get_full_key_paths/2, get/2]).
+-export([map/2, get_all_leafs/1, get_leaf_keys/1, count_leafs/1]).
+
+% a key tree looks like this:
+% Tree -> [] or [{Key, Value, Tree} | SiblingTree]
+% ChildTree -> Tree
+% SiblingTree -> [] or [{SiblingKey, Value, Tree} | Tree]
+% And each Key < SiblingKey
+
+
+
+% key tree functions
+
+% When the same key is found in the trees, the value in tree B is discarded.
+merge([], B) ->
+ B;
+merge(A, []) ->
+ A;
+merge([ATree | ANextTree], [BTree | BNextTree]) ->
+ {AKey, AValue, ASubTree} = ATree,
+ {BKey, _BValue, BSubTree} = BTree,
+ if
+ AKey == BKey ->
+ %same key
+ MergedSubTree = merge(ASubTree, BSubTree),
+ MergedNextTree = merge(ANextTree, BNextTree),
+ [{AKey, AValue, MergedSubTree} | MergedNextTree];
+ AKey < BKey ->
+ [ATree | merge(ANextTree, [BTree | BNextTree])];
+ true ->
+ [BTree | merge([ATree | ANextTree], BNextTree)]
+ end.
+
+find_missing(_Tree, []) ->
+ [];
+find_missing([], Keys) ->
+ Keys;
+find_missing([{Key, _, SubTree} | RestTree], Keys) ->
+ SrcKeys2 = Keys -- Key,
+ SrcKeys3 = find_missing(SubTree, SrcKeys2),
+ find_missing(RestTree, SrcKeys3).
+
+
+% get the leafs in the tree matching the keys. The matching key nodes can be
+% leafs or an inner nodes. If an inner node, then the leafs for that node
+% are returned.
+get_key_leafs(Tree, Keys) ->
+ get_key_leafs(Tree, Keys, []).
+
+get_key_leafs(_Tree, [], _KeyPathAcc) ->
+ {[], []};
+get_key_leafs([], KeysToGet, _KeyPathAcc) ->
+ {[], KeysToGet};
+get_key_leafs([{Key, _Value, SubTree}=Tree | RestTree], KeysToGet, KeyPathAcc) ->
+ case KeysToGet -- [Key] of
+ KeysToGet -> % same list, key not found
+ {LeafsFound, KeysToGet2} = get_key_leafs(SubTree, KeysToGet, [Key | KeyPathAcc]),
+ {RestLeafsFound, KeysRemaining} = get_key_leafs(RestTree, KeysToGet2, KeyPathAcc),
+ {LeafsFound ++ RestLeafsFound, KeysRemaining};
+ KeysToGet2 ->
+ LeafsFound = get_all_leafs([Tree], KeyPathAcc),
+ LeafKeysFound = [LeafKeyFound || {LeafKeyFound, _, _} <- LeafsFound],
+ KeysToGet2 = KeysToGet2 -- LeafKeysFound,
+ {RestLeafsFound, KeysRemaining} = get_key_leafs(RestTree, KeysToGet2, KeyPathAcc),
+ {LeafsFound ++ RestLeafsFound, KeysRemaining}
+ end.
+
+get(Tree, KeysToGet) ->
+ {KeyPaths, KeysNotFound} = get_full_key_paths(Tree, KeysToGet),
+ FixedResults = [ {Key, Value, [Key0 || {Key0, _} <- Path]} || [{Key, Value}|_] = Path <- KeyPaths],
+ {FixedResults, KeysNotFound}.
+
+get_full_key_paths(Tree, Keys) ->
+ get_full_key_paths(Tree, Keys, []).
+
+get_full_key_paths(_Tree, [], _KeyPathAcc) ->
+ {[], []};
+get_full_key_paths([], KeysToGet, _KeyPathAcc) ->
+ {[], KeysToGet};
+get_full_key_paths([{KeyId, Value, SubTree} | RestTree], KeysToGet, KeyPathAcc) ->
+ KeysToGet2 = KeysToGet -- [KeyId],
+ CurrentNodeResult =
+ case length(KeysToGet2) == length(KeysToGet) of
+ true -> % not in the key list.
+ [];
+ false -> % this node is the key list. return it
+ [[{KeyId, Value} | KeyPathAcc]]
+ end,
+ {KeysGotten, KeysRemaining} = get_full_key_paths(SubTree, KeysToGet2, [{KeyId, Value} | KeyPathAcc]),
+ {KeysGotten2, KeysRemaining2} = get_full_key_paths(RestTree, KeysRemaining, KeyPathAcc),
+ {CurrentNodeResult ++ KeysGotten ++ KeysGotten2, KeysRemaining2}.
+
+get_all_leafs(Tree) ->
+ get_all_leafs(Tree, []).
+
+get_all_leafs([], _KeyPathAcc) ->
+ [];
+get_all_leafs([{KeyId, Value, []} | RestTree], KeyPathAcc) ->
+ [{KeyId, Value, [KeyId | KeyPathAcc]} | get_all_leafs(RestTree, KeyPathAcc)];
+get_all_leafs([{KeyId, _Value, SubTree} | RestTree], KeyPathAcc) ->
+ get_all_leafs(SubTree, [KeyId | KeyPathAcc]) ++ get_all_leafs(RestTree, KeyPathAcc).
+
+get_leaf_keys([]) ->
+ [];
+get_leaf_keys([{Key, _Value, []} | RestTree]) ->
+ [Key | get_leaf_keys(RestTree)];
+get_leaf_keys([{_Key, _Value, SubTree} | RestTree]) ->
+ get_leaf_keys(SubTree) ++ get_leaf_keys(RestTree).
+
+count_leafs([]) ->
+ 0;
+count_leafs([{_Key, _Value, []} | RestTree]) ->
+ 1 + count_leafs(RestTree);
+count_leafs([{_Key, _Value, SubTree} | RestTree]) ->
+ count_leafs(SubTree) + count_leafs(RestTree).
+
+
+map(_Fun, []) ->
+ [];
+map(Fun, [{Key, Value, SubTree} | RestTree]) ->
+ Value2 = Fun(Key, Value),
+ [{Key, Value2, map(Fun, SubTree)} | map(Fun, RestTree)].
+
Added: incubator/couchdb/trunk/src/couchdb/couch_log.erl
URL: http://svn.apache.org/viewvc/incubator/couchdb/trunk/src/couchdb/couch_log.erl?rev=642432&view=auto
==============================================================================
--- incubator/couchdb/trunk/src/couchdb/couch_log.erl (added)
+++ incubator/couchdb/trunk/src/couchdb/couch_log.erl Fri Mar 28 16:32:19 2008
@@ -0,0 +1,130 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_log).
+-behaviour(gen_event).
+
+-export([start_link/2,stop/0]).
+-export([error/1,error/2,info/1,info/2,debug/1,debug/2,get_level/0,get_level_integer/0, set_level/1]).
+-export([init/1, handle_event/2, terminate/2, code_change/3, handle_info/2, handle_call/2]).
+
+-define(LEVEL_ERROR, 3).
+-define(LEVEL_INFO, 2).
+-define(LEVEL_DEBUG, 1).
+-define(LEVEL_TMI, 0).
+
+level_integer(error) -> ?LEVEL_ERROR;
+level_integer(info) -> ?LEVEL_INFO;
+level_integer(debug) -> ?LEVEL_DEBUG;
+level_integer(tmi) -> ?LEVEL_TMI;
+level_integer(_Else) -> ?LEVEL_ERROR. % anything else default to ERROR level
+
+level_atom(?LEVEL_ERROR) -> error;
+level_atom(?LEVEL_INFO) -> info;
+level_atom(?LEVEL_DEBUG) -> debug;
+level_atom(?LEVEL_TMI) -> tmi.
+
+
+start_link(Filename, Level) ->
+ couch_event_sup:start_link({local, couch_log}, error_logger, couch_log, {Filename, Level}).
+
+stop() ->
+ couch_event_sup:stop(couch_log).
+
+init({Filename, Level}) ->
+ {ok, Fd} = file:open(Filename, [append]),
+ {ok, {Fd, level_integer(Level)}}.
+
+error(Msg) ->
+ error("~s", [Msg]).
+
+error(Format, Args) ->
+ error_logger:error_report(couch_error, {Format, Args}).
+
+info(Msg) ->
+ info("~s", [Msg]).
+
+info(Format, Args) ->
+ case get_level_integer() =< ?LEVEL_INFO of
+ true ->
+ error_logger:info_report(couch_info, {Format, Args});
+ false ->
+ ok
+ end.
+
+debug(Msg) ->
+ debug("~s", [Msg]).
+
+debug(Format, Args) ->
+ case get_level_integer() =< ?LEVEL_DEBUG of
+ true ->
+ error_logger:info_report(couch_debug, {Format, Args});
+ false ->
+ ok
+ end.
+
+set_level(LevelAtom) ->
+ set_level_integer(level_integer(LevelAtom)).
+
+get_level() ->
+ level_atom(get_level_integer()).
+
+get_level_integer() ->
+ catch gen_event:call(error_logger, couch_log, get_level_integer).
+
+set_level_integer(Int) ->
+ gen_event:call(error_logger, couch_log, {set_level_integer, Int}).
+
+handle_event({error_report, _, {Pid, couch_error, {Format, Args}}}, {Fd, _LogLevel}=State) ->
+ log(Fd, Pid, error, Format, Args),
+ {ok, State};
+handle_event({error_report, _, {Pid, _, _}}=Event, {Fd, _LogLevel}=State) ->
+ log(Fd, Pid, error, "~p", [Event]),
+ {ok, State};
+handle_event({error, _, {Pid, Format, Args}}, {Fd, _LogLevel}=State) ->
+ log(Fd, Pid, error, Format, Args),
+ {ok, State};
+handle_event({info_report, _, {Pid, couch_info, {Format, Args}}}, {Fd, LogLevel}=State)
+when LogLevel =< ?LEVEL_INFO ->
+ log(Fd, Pid, info, Format, Args),
+ {ok, State};
+handle_event({info_report, _, {Pid, couch_debug, {Format, Args}}}, {Fd, LogLevel}=State)
+when LogLevel =< ?LEVEL_DEBUG ->
+ log(Fd, Pid, debug, Format, Args),
+ {ok, State};
+handle_event({_, _, {Pid, _, _}}=Event, {Fd, LogLevel}=State)
+when LogLevel =< ?LEVEL_TMI ->
+ % log every remaining event if tmi!
+ log(Fd, Pid, tmi, "~p", [Event]),
+ {ok, State};
+handle_event(_Event, State) ->
+ {ok, State}.
+
+handle_call(get_level_integer, {_Fd, LogLevel}=State) ->
+ {ok, LogLevel, State};
+handle_call({set_level_integer, NewLevel}, {Fd, _LogLevel}) ->
+ {ok, ok, {Fd, NewLevel}}.
+
+handle_info(_Info, State) ->
+ {ok, State}.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+terminate(_Arg, {Fd, _LoggingLevel}) ->
+ file:close(Fd).
+
+log(Fd, Pid, Level, Format, Args) ->
+ Msg = io_lib:format(Format, Args),
+ ok = io:format("[~s] [~p] ~s~n", [Level, Pid, Msg]), % dump to console too
+ {ok, Msg2, _} = regexp:gsub(lists:flatten(Msg),"\\r\\n|\\r|\\n", "\r\n"),
+ ok = io:format(Fd, "[~s] [~s] [~p] ~s\r~n\r~n", [httpd_util:rfc1123_date(), Level, Pid, Msg2]).
Added: incubator/couchdb/trunk/src/couchdb/couch_query_servers.erl
URL: http://svn.apache.org/viewvc/incubator/couchdb/trunk/src/couchdb/couch_query_servers.erl?rev=642432&view=auto
==============================================================================
--- incubator/couchdb/trunk/src/couchdb/couch_query_servers.erl (added)
+++ incubator/couchdb/trunk/src/couchdb/couch_query_servers.erl Fri Mar 28 16:32:19 2008
@@ -0,0 +1,206 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_query_servers).
+-behaviour(gen_server).
+
+-export([start_link/1]).
+
+-export([init/1, terminate/2, handle_call/3, handle_cast/2, handle_info/2,code_change/3,stop/0]).
+-export([start_doc_map/2, map_docs/2, stop_doc_map/1]).
+
+-export([test/0, test/1]).
+
+-include("couch_db.hrl").
+
+timeout() ->
+ % hardcoded 5 sec timeout per document
+ 5000.
+
+start_link(QueryServerList) ->
+ gen_server:start_link({local, couch_query_servers}, couch_query_servers, QueryServerList, []).
+
+stop() ->
+ exit(whereis(couch_query_servers), close).
+
+readline(Port) ->
+ readline(Port, []).
+
+readline(Port, Acc) ->
+ Timer = erlang:send_after(timeout(), self(), timeout),
+ Result =
+ receive
+ {Port, {data, {noeol, Data}}} ->
+ readline(Port, [Data|Acc]);
+ {Port, {data, {eol, Data}}} ->
+ lists:flatten(lists:reverse(Acc, Data));
+ {Port, Err} ->
+ catch port_close(Port),
+ erlang:cancel_timer(Timer),
+ throw({map_process_error, Err});
+ timeout ->
+ catch port_close(Port),
+ throw({map_process_error, "map function timed out"})
+ end,
+ case erlang:cancel_timer(Timer) of
+ false ->
+ % message already sent. clear it
+ receive timeout -> ok end;
+ _ ->
+ ok
+ end,
+ Result.
+
+read_json(Port) ->
+ case cjson:decode(readline(Port)) of
+ {obj, [{"log", Msg}]} when is_list(Msg) ->
+ % we got a message to log. Log it and continue
+ couch_log:info("Query Server Log Message: ~s", [Msg]),
+ read_json(Port);
+ Else ->
+ Else
+ end.
+
+writeline(Port, String) ->
+ true = port_command(Port, String ++ "\n").
+
+% send command and get a response.
+prompt(Port, Json) ->
+ writeline(Port, cjson:encode(Json)),
+ read_json(Port).
+
+
+start_doc_map(Lang, Functions) ->
+ Port =
+ case gen_server:call(couch_query_servers, {get_port, Lang}) of
+ {ok, Port0} ->
+ link(Port0),
+ Port0;
+ {empty, Cmd} ->
+ couch_log:info("Spawning new ~s instance.", [Lang]),
+ open_port({spawn, Cmd}, [stream,
+ {line, 1000},
+ exit_status,
+ hide]);
+ Error ->
+ throw(Error)
+ end,
+ true = prompt(Port, {"reset"}),
+ % send the functions as json strings
+ lists:foreach(fun(FunctionSource) ->
+ case prompt(Port, {"add_fun", FunctionSource}) of
+ true -> ok;
+ {obj, [{"error", Id}, {"reason", Reason}]} ->
+ throw({Id, Reason})
+ end
+ end,
+ Functions),
+ {ok, {Lang, Port}}.
+
+map_docs({_Lang, Port}, Docs) ->
+ % send the documents
+ Results =
+ lists:map(
+ fun(Doc) ->
+ Json = couch_doc:to_json_obj(Doc, []),
+ case prompt(Port, {"map_doc", Json}) of
+ {obj, [{"error", Id}, {"reason", Reason}]} ->
+ throw({list_to_atom(Id),Reason});
+ {obj, [{"reason", Reason}, {"error", Id}]} ->
+ throw({list_to_atom(Id),Reason});
+ Results when is_tuple(Results) ->
+ % the results are a json array of function map yields like this:
+ % {FunResults1, FunResults2 ...}
+ % where funresults is are json arrays of key value pairs:
+ % {{Key1, Value1}, {Key2, Value2}}
+ % Convert to real lists, execept the key, value pairs
+ [tuple_to_list(FunResult) || FunResult <- tuple_to_list(Results)]
+ end
+ end,
+ Docs),
+ {ok, Results}.
+
+
+stop_doc_map(nil) ->
+ ok;
+stop_doc_map({Lang, Port}) ->
+ ok = gen_server:call(couch_query_servers, {return_port, {Lang, Port}}),
+ true = unlink(Port),
+ ok.
+
+init(QueryServerList) ->
+ {ok, {QueryServerList, []}}.
+
+terminate(_Reason, _Server) ->
+ ok.
+
+
+handle_call({get_port, Lang}, {FromPid, _}, {QueryServerList, LangPorts}) ->
+ case lists:keysearch(Lang, 1, LangPorts) of
+ {value, {_, Port}=LangPort} ->
+ Result =
+ case catch port_connect(Port, FromPid) of
+ true ->
+ true = unlink(Port),
+ {ok, Port};
+ Error ->
+ catch port_close(Port),
+ Error
+ end,
+ {reply, Result, {QueryServerList, LangPorts -- [LangPort]}};
+ false ->
+ case lists:keysearch(Lang, 1, QueryServerList) of
+ {value, {_, ServerCmd}} ->
+ {reply, {empty, ServerCmd}, {QueryServerList, LangPorts}};
+ false -> % not a supported language
+ {reply, {query_language_unknown, Lang}, {QueryServerList, LangPorts}}
+ end
+ end;
+handle_call({return_port, {Lang, Port}}, _From, {QueryServerList, LangPorts}) ->
+ case catch port_connect(Port, self()) of
+ true ->
+ {reply, ok, {QueryServerList, [{Lang, Port} | LangPorts]}};
+ _ ->
+ catch port_close(Port),
+ {reply, ok, {QueryServerList, LangPorts}}
+ end.
+
+handle_cast(_Whatever, {Cmd, Ports}) ->
+ {noreply, {Cmd, Ports}}.
+
+handle_info({Port, {exit_status, Status}}, {QueryServerList, LangPorts}) ->
+ case lists:keysearch(Port, 2, LangPorts) of
+ {value, {Lang, _}} ->
+ case Status of
+ 0 -> ok;
+ _ -> couch_log:error("Abnormal shutdown of ~s query server process (exit_status: ~w).", [Lang, Status])
+ end,
+ {noreply, {QueryServerList, lists:keydelete(Port, 2, LangPorts)}};
+ _ ->
+ couch_log:error("Unknown linked port/process crash: ~p", [Port])
+ end;
+handle_info(_Whatever, {Cmd, Ports}) ->
+ {noreply, {Cmd, Ports}}.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+test() ->
+ test("../js/js -f main.js").
+
+test(Cmd) ->
+ start_link(Cmd),
+ {ok, DocMap} = start_doc_map("javascript", ["function(doc) {if (doc[0] == 'a') return doc[1];}"]),
+ {ok, Results} = map_docs(DocMap, [#doc{body={"a", "b"}}, #doc{body={"c", "d"}},#doc{body={"a", "c"}}]),
+ io:format("Results: ~w~n", [Results]),
+ stop_doc_map(DocMap),
+ ok.
Added: incubator/couchdb/trunk/src/couchdb/couch_rep.erl
URL: http://svn.apache.org/viewvc/incubator/couchdb/trunk/src/couchdb/couch_rep.erl?rev=642432&view=auto
==============================================================================
--- incubator/couchdb/trunk/src/couchdb/couch_rep.erl (added)
+++ incubator/couchdb/trunk/src/couchdb/couch_rep.erl Fri Mar 28 16:32:19 2008
@@ -0,0 +1,308 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_rep).
+
+-include("couch_db.hrl").
+
+-export([replicate/2, replicate/3, test/0, test_write_docs/3]).
+
+-record(stats, {
+ docs_read=0,
+ read_errors=0,
+ docs_copied=0,
+ copy_errors=0
+ }).
+
+
+url_encode([H|T]) ->
+ if
+ H >= $a, $z >= H ->
+ [H|url_encode(T)];
+ H >= $A, $Z >= H ->
+ [H|url_encode(T)];
+ H >= $0, $9 >= H ->
+ [H|url_encode(T)];
+ H == $_; H == $.; H == $-; H == $: ->
+ [H|url_encode(T)];
+ true ->
+ case lists:flatten(io_lib:format("~.16.0B", [H])) of
+ [X, Y] ->
+ [$%, X, Y | url_encode(T)];
+ [X] ->
+ [$%, $0, X | url_encode(T)]
+ end
+ end;
+url_encode([]) ->
+ [].
+
+
+replicate(DbNameA, DbNameB) ->
+ replicate(DbNameA, DbNameB, []).
+
+replicate(Source, Target, Options) ->
+ {ok, DbSrc} = open_db(Source),
+ {ok, DbTgt} = open_db(Target),
+ {ok, HostName} = inet:gethostname(),
+
+ RepRecKey = ?LOCAL_DOC_PREFIX ++ HostName ++ ":" ++ Source ++ ":" ++ Target,
+ StartTime = httpd_util:rfc1123_date(),
+ RepRecSrc =
+ case open_doc(DbSrc, RepRecKey, []) of
+ {ok, SrcDoc} -> SrcDoc;
+ _ -> #doc{id=RepRecKey}
+ end,
+
+ RepRecTgt =
+ case open_doc(DbTgt, RepRecKey, []) of
+ {ok, TgtDoc} -> TgtDoc;
+ _ -> #doc{id=RepRecKey}
+ end,
+
+ #doc{body={obj,OldRepHistoryProps}} = RepRecSrc,
+ #doc{body={obj,OldRepHistoryPropsTrg}} = RepRecTgt,
+
+ SeqNum0 =
+ case OldRepHistoryProps == OldRepHistoryPropsTrg of
+ true ->
+ % if the records are identical, then we have a valid replication history
+ proplists:get_value("source_last_seq", OldRepHistoryProps, 0);
+ false ->
+ 0
+ end,
+
+ SeqNum =
+ case proplists:get_value(full, Options, false)
+ orelse proplists:get_value("full", Options, false) of
+ true -> 0;
+ false -> SeqNum0
+ end,
+
+ {NewSeqNum, Stats} = pull_rep(DbTgt, DbSrc, SeqNum, #stats{}),
+ case NewSeqNum == SeqNum andalso OldRepHistoryProps /= [] of
+ true ->
+ % nothing changed, don't record results
+ {ok, {obj, OldRepHistoryProps}};
+ false ->
+ HistEntries =[
+ {obj,
+ [{"start_time", StartTime},
+ {"end_time", httpd_util:rfc1123_date()},
+ {"start_last_seq", SeqNum},
+ {"end_last_seq", NewSeqNum},
+ {"docs_read", Stats#stats.docs_read},
+ {"read_errors", Stats#stats.read_errors},
+ {"docs_copied", Stats#stats.docs_copied},
+ {"copy_errors", Stats#stats.copy_errors}]}
+ | tuple_to_list(proplists:get_value("history", OldRepHistoryProps, {}))],
+ % something changed, record results
+ NewRepHistory =
+ {obj,
+ [{"session_id", couch_util:new_uuid()},
+ {"source_last_seq", NewSeqNum},
+ {"history", list_to_tuple(lists:sublist(HistEntries, 50))}]},
+
+ {ok, _} = update_doc(DbSrc, RepRecSrc#doc{body=NewRepHistory}, []),
+ {ok, _} = update_doc(DbTgt, RepRecTgt#doc{body=NewRepHistory}, []),
+ {ok, NewRepHistory}
+ end.
+
+pull_rep(DbTarget, DbSource, SourceSeqNum, Stats) ->
+ {ok, NewSeq} =
+ enum_docs_since(DbSource, SourceSeqNum,
+ fun(#doc_info{update_seq=Seq}=SrcDocInfo, _, {_, AccStats}) ->
+ Stats2 = maybe_save_docs(DbTarget, DbSource, SrcDocInfo, AccStats),
+ {ok, {Seq, Stats2}}
+ end, {SourceSeqNum, Stats}),
+ NewSeq.
+
+
+maybe_save_docs(DbTarget, DbSource,
+ #doc_info{id=Id, rev=Rev, conflict_revs=Conflicts, deleted_conflict_revs=DelConflicts},
+ Stats) ->
+ SrcRevs = [Rev | Conflicts] ++ DelConflicts,
+ {ok, [{Id, MissingRevs}]} = get_missing_revs(DbTarget, [{Id, SrcRevs}]),
+
+ case MissingRevs of
+ [] ->
+ Stats;
+ _Else ->
+ % the 'ok' below validates no unrecoverable errors (like network failure, etc).
+ {ok, DocResults} = open_doc_revs(DbSource, Id, MissingRevs, [latest]),
+
+ Docs = [RevDoc || {ok, RevDoc} <- DocResults], % only match successful loads
+
+ Stats2 = Stats#stats{
+ docs_read=Stats#stats.docs_read + length(Docs),
+ read_errors=Stats#stats.read_errors + length(DocResults) - length(Docs)},
+
+ case Docs of
+ [] ->
+ Stats2;
+ _ ->
+ % the 'ok' below validates no unrecoverable errors (like network failure, etc).
+ ok = save_docs(DbTarget, Docs, []),
+ Stats2#stats{docs_copied=Stats2#stats.docs_copied+length(Docs)}
+ end
+ end.
+
+
+do_http_request(Url, Action) ->
+ do_http_request(Url, Action, []).
+
+do_http_request(Url, Action, JsonBody) ->
+ couch_log:debug("couch_rep HTTP client request:"),
+ couch_log:debug("\tAction: ~p", [Action]),
+ couch_log:debug("\tUrl: ~p", [Url]),
+ Request =
+ case JsonBody of
+ [] ->
+ {Url, []};
+ _ ->
+ {Url, [], "application/json; charset=utf-8", lists:flatten(cjson:encode(JsonBody))}
+ end,
+ {ok, {{_, ResponseCode,_},_Headers, ResponseBody}} = http:request(Action, Request, [], []),
+ if
+ ResponseCode >= 200, ResponseCode < 500 ->
+ cjson:decode(ResponseBody)
+ end.
+
+enum_docs0(_InFun, [], Acc) ->
+ Acc;
+enum_docs0(InFun, [DocInfo | Rest], Acc) ->
+ case InFun(DocInfo, 0, Acc) of
+ {ok, Acc2} -> enum_docs0(InFun, Rest, Acc2);
+ {stop, Acc2} -> Acc2
+ end.
+
+open_db("http" ++ DbName)->
+ case lists:last(DbName) of
+ $/ ->
+ {ok, "http" ++ DbName};
+ _ ->
+ {ok, "http" ++ DbName ++ "/"}
+ end;
+open_db(DbName)->
+ couch_server:open(DbName).
+
+
+enum_docs_since(DbUrl, StartSeq, InFun, InAcc) when is_list(DbUrl) ->
+ Url = DbUrl ++ "_all_docs_by_seq?startkey=" ++ integer_to_list(StartSeq),
+ {obj, Results} = do_http_request(Url, get),
+ DocInfoList=
+ lists:map(fun({obj, RowInfoList}) ->
+ {obj, RowValueProps} = proplists:get_value("value", RowInfoList),
+ #doc_info{
+ id=proplists:get_value("id", RowInfoList),
+ rev=proplists:get_value("rev", RowValueProps),
+ update_seq = proplists:get_value("key", RowInfoList),
+ conflict_revs =
+ tuple_to_list(proplists:get_value("conflicts", RowValueProps, {})),
+ deleted_conflict_revs =
+ tuple_to_list(proplists:get_value("deleted_conflicts", RowValueProps, {})),
+ deleted = proplists:get_value("deleted", RowValueProps, false)}
+ end, tuple_to_list(proplists:get_value("rows", Results))),
+ {ok, enum_docs0(InFun, DocInfoList, InAcc)};
+enum_docs_since(DbSource, StartSeq, Fun, Acc) ->
+ couch_db:enum_docs_since(DbSource, StartSeq, Fun, Acc).
+
+get_missing_revs(DbUrl, DocIdRevsList) when is_list(DbUrl) ->
+ JsonDocIdRevsList = {obj,
+ [{Id, list_to_tuple(RevList)} || {Id, RevList} <- DocIdRevsList]},
+ {obj, ResponseMembers} =
+ do_http_request(DbUrl ++ "_missing_revs",
+ post, JsonDocIdRevsList),
+ {obj, DocMissingRevsList} = proplists:get_value("missing_revs", ResponseMembers),
+ {ok, [{Id, tuple_to_list(MissingRevs)} || {Id, MissingRevs} <- DocMissingRevsList]};
+get_missing_revs(Db, DocId) ->
+ couch_db:get_missing_revs(Db, DocId).
+
+
+update_doc(DbUrl, #doc{id=DocId}=Doc, _Options) when is_list(DbUrl) ->
+ Url = DbUrl ++ url_encode(DocId),
+ {obj, ResponseMembers} =
+ do_http_request(Url, put, couch_doc:to_json_obj(Doc, [revs,attachments])),
+ RevId = proplists:get_value("_rev", ResponseMembers),
+ {ok, RevId};
+update_doc(Db, Doc, Options) ->
+ couch_db:update_doc(Db, Doc, Options).
+
+save_docs(_, [], _) ->
+ ok;
+save_docs(DbUrl, Docs, []) when is_list(DbUrl) ->
+ JsonDocs = [couch_doc:to_json_obj(Doc, [revs,attachments]) || Doc <- Docs],
+ {obj, Returned} =
+ do_http_request(DbUrl ++ "_bulk_docs", post, {obj, [{new_edits, false}, {docs, list_to_tuple(JsonDocs)}]}),
+ true = proplists:get_value("ok", Returned),
+ ok;
+save_docs(Db, Docs, Options) ->
+ couch_db:save_docs(Db, Docs, Options).
+
+
+open_doc(DbUrl, DocId, []) when is_list(DbUrl) ->
+ case do_http_request(DbUrl ++ url_encode(DocId), get) of
+ {obj, [{"error", ErrId}, {"reason", Reason}]} ->
+ {list_to_atom(ErrId), Reason};
+ Doc ->
+ {ok, couch_doc:from_json_obj(Doc)}
+ end;
+open_doc(Db, DocId, Options) when not is_list(Db) ->
+ couch_db:open_doc(Db, DocId, Options).
+
+
+open_doc_revs(DbUrl, DocId, Revs, Options) when is_list(DbUrl) ->
+ QueryOptionStrs =
+ lists:map(fun(latest) ->
+ % latest is only option right now
+ "latest=true"
+ end, Options),
+ RevsQueryStrs = lists:flatten(cjson:encode(list_to_tuple(Revs))),
+ Url = DbUrl ++ DocId ++ "?" ++ couch_util:implode(["revs=true", "attachments=true", "open_revs=" ++ RevsQueryStrs ] ++ QueryOptionStrs, "&"),
+ JsonResults = do_http_request(Url, get, []),
+ Results =
+ lists:map(
+ fun({obj, [{"missing", Rev}]}) ->
+ {{not_found, missing}, Rev};
+ ({obj, [{"ok", JsonDoc}]}) ->
+ {ok, couch_doc:from_json_obj(JsonDoc)}
+ end, tuple_to_list(JsonResults)),
+ {ok, Results};
+open_doc_revs(Db, DocId, Revs, Options) ->
+ couch_db:open_doc_revs(Db, DocId, Revs, Options).
+
+
+
+
+
+test() ->
+ couch_server:start(),
+ %{ok, LocalA} = couch_server:open("replica_a"),
+ {ok, LocalA} = couch_server:create("replica_a", [overwrite]),
+ {ok, _} = couch_server:create("replica_b", [overwrite]),
+ %DbA = "replica_a",
+ DbA = "http://localhost:5984/replica_a/",
+ %DbB = "replica_b",
+ DbB = "http://localhost:5984/replica_b/",
+ _DocUnids = test_write_docs(10, LocalA, []),
+ replicate(DbA, DbB),
+ %{ok, _Rev} = couch_db:delete_doc(LocalA, lists:nth(1, DocUnids), any),
+ % replicate(DbA, DbB),
+ ok.
+
+test_write_docs(0, _Db, Output) ->
+ lists:reverse(Output);
+test_write_docs(N, Db, Output) ->
+ Doc = #doc{
+ id=integer_to_list(N),
+ body={obj, [{"foo", integer_to_list(N)}, {"num", N}, {"bar", "blah"}]}},
+ couch_db:save_doc(Db, Doc, []),
+ test_write_docs(N-1, Db, [integer_to_list(N) | Output]).
Added: incubator/couchdb/trunk/src/couchdb/couch_server.erl
URL: http://svn.apache.org/viewvc/incubator/couchdb/trunk/src/couchdb/couch_server.erl?rev=642432&view=auto
==============================================================================
--- incubator/couchdb/trunk/src/couchdb/couch_server.erl (added)
+++ incubator/couchdb/trunk/src/couchdb/couch_server.erl Fri Mar 28 16:32:19 2008
@@ -0,0 +1,215 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_server).
+-behaviour(gen_server).
+-behaviour(application).
+
+-export([start/0,start/1,start/2,stop/0,stop/1]).
+-export([open/1,create/2,delete/1,all_databases/0,get_version/0]).
+-export([init/1, handle_call/3,sup_start_link/2]).
+-export([handle_cast/2,code_change/3,handle_info/2,terminate/2]).
+-export([dev_start/0,remote_restart/0]).
+
+-include("couch_db.hrl").
+
+-record(server,{
+ root_dir = [],
+ dbname_regexp,
+ options=[]
+ }).
+
+start() ->
+ start("").
+
+start(IniFile) when is_atom(IniFile) ->
+ couch_server_sup:start_link(atom_to_list(IniFile) ++ ".ini");
+start(IniNum) when is_integer(IniNum) ->
+ couch_server_sup:start_link("couch" ++ integer_to_list(IniNum) ++ ".ini");
+start(IniFile) ->
+ couch_server_sup:start_link(IniFile).
+
+start(_Type, _Args) ->
+ start().
+
+stop() ->
+ couch_server_sup:stop().
+
+stop(_Reason) ->
+ stop().
+
+dev_start() ->
+ stop(),
+ up_to_date = make:all([load, debug_info]),
+ start().
+
+get_version() ->
+ Apps = application:loaded_applications(),
+ case lists:keysearch(couch, 1, Apps) of
+ {value, {_, _, Vsn}} ->
+ Vsn;
+ false ->
+ "0.0.0"
+ end.
+
+sup_start_link(RootDir, Options) ->
+ gen_server:start_link({local, couch_server}, couch_server, {RootDir, Options}, []).
+
+open(Filename) ->
+ gen_server:call(couch_server, {open, Filename}).
+
+create(Filename, Options) ->
+ gen_server:call(couch_server, {create, Filename, Options}).
+
+delete(Filename) ->
+ gen_server:call(couch_server, {delete, Filename}).
+
+remote_restart() ->
+ gen_server:call(couch_server, remote_restart).
+
+init({RootDir, Options}) ->
+ {ok, RegExp} = regexp:parse("^[a-z][a-z0-9\\_\\$()\\+\\-\\/]*$"),
+ {ok, #server{root_dir=RootDir, dbname_regexp=RegExp, options=Options}}.
+
+check_filename(#server{dbname_regexp=RegExp}, Filename) ->
+ case regexp:match(Filename, RegExp) of
+ nomatch ->
+ {error, illegal_database_name};
+ _Match ->
+ ok
+ end.
+
+get_full_filename(Server, Filename) ->
+ filename:join([Server#server.root_dir, "./" ++ Filename ++ ".couch"]).
+
+
+terminate(_Reason, _Server) ->
+ ok.
+
+all_databases() ->
+ {ok, Root} = gen_server:call(couch_server, get_root),
+ Filenames =
+ filelib:fold_files(Root, "^[a-z0-9\\_\\$()\\+\\-]*[\\.]couch$", true,
+ fun(Filename, AccIn) ->
+ case Filename -- Root of
+ [$/ | RelativeFilename] -> ok;
+ RelativeFilename -> ok
+ end,
+ [filename:rootname(RelativeFilename, ".couch") | AccIn]
+ end, []),
+ {ok, Filenames}.
+
+
+handle_call(get_root, _From, #server{root_dir=Root}=Server) ->
+ {reply, {ok, Root}, Server};
+handle_call({open, Filename}, From, Server) ->
+ case check_filename(Server, Filename) of
+ {error, Error} ->
+ {reply, {error, Error}, Server};
+ ok ->
+ Filepath = get_full_filename(Server, Filename),
+ Result = supervisor:start_child(couch_server_sup,
+ {Filename,
+ {couch_db, open, [Filename, Filepath]},
+ transient ,
+ infinity,
+ supervisor,
+ [couch_db]}),
+ case Result of
+ {ok, Db} ->
+ {reply, {ok, Db}, Server};
+ {error, already_present} ->
+ ok = supervisor:delete_child(couch_server_sup, Filename),
+ % call self recursively
+ handle_call({open, Filename}, From, Server);
+ {error, {already_started, Db}} ->
+ {reply, {ok, Db}, Server};
+ {error, {not_found, _}} ->
+ {reply, not_found, Server};
+ {error, {Error, _}} ->
+ {reply, {error, Error}, Server}
+ end
+ end;
+handle_call({create, Filename, Options}, _From, Server) ->
+ case check_filename(Server, Filename) of
+ {error, Error} ->
+ {reply, {error, Error}, Server};
+ ok ->
+ Filepath = get_full_filename(Server, Filename),
+ ChildSpec = {Filename,
+ {couch_db, create, [Filename, Filepath, Options]},
+ transient,
+ infinity,
+ supervisor,
+ [couch_db]},
+ Result =
+ case supervisor:delete_child(couch_server_sup, Filename) of
+ ok ->
+ sup_start_child(couch_server_sup, ChildSpec);
+ {error, not_found} ->
+ sup_start_child(couch_server_sup, ChildSpec);
+ {error, running} ->
+ % a server process for this database already started. Maybe kill it
+ case lists:member(overwrite, Options) of
+ true ->
+ supervisor:terminate_child(couch_server_sup, Filename),
+ ok = supervisor:delete_child(couch_server_sup, Filename),
+ sup_start_child(couch_server_sup, ChildSpec);
+ false ->
+ {error, database_already_exists}
+ end
+ end,
+ case Result of
+ {ok, _Db} -> couch_db_update_notifier:notify({created, Filename});
+ _ -> ok
+ end,
+ {reply, Result, Server}
+ end;
+handle_call({delete, Filename}, _From, Server) ->
+ FullFilepath = get_full_filename(Server, Filename),
+ supervisor:terminate_child(couch_server_sup, Filename),
+ supervisor:delete_child(couch_server_sup, Filename),
+ case file:delete(FullFilepath) of
+ ok ->
+ couch_db_update_notifier:notify({deleted, Filename}),
+ {reply, ok, Server};
+ {error, enoent} ->
+ {reply, not_found, Server};
+ Else ->
+ {reply, Else, Server}
+ end;
+handle_call(remote_restart, _From, #server{options=Options}=Server) ->
+ case proplists:get_value(remote_restart, Options) of
+ true ->
+ exit(self(), restart);
+ _ ->
+ ok
+ end,
+ {reply, ok, Server}.
+
+% this function is just to strip out the child spec error stuff if hit
+sup_start_child(couch_server_sup, ChildSpec) ->
+ case supervisor:start_child(couch_server_sup, ChildSpec) of
+ {error, {Error, _ChildInfo}} ->
+ {error, Error};
+ Else ->
+ Else
+ end.
+
+handle_cast(_Msg, State) ->
+ {noreply,State}.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+handle_info(_Info, State) ->
+ {noreply, State}.
Added: incubator/couchdb/trunk/src/couchdb/couch_server_sup.erl
URL: http://svn.apache.org/viewvc/incubator/couchdb/trunk/src/couchdb/couch_server_sup.erl?rev=642432&view=auto
==============================================================================
--- incubator/couchdb/trunk/src/couchdb/couch_server_sup.erl (added)
+++ incubator/couchdb/trunk/src/couchdb/couch_server_sup.erl Fri Mar 28 16:32:19 2008
@@ -0,0 +1,185 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_server_sup).
+-behaviour(supervisor).
+
+-define(DEFAULT_INI, "couch.ini").
+
+-export([start_link/1,stop/0]).
+
+%% supervisor callbacks
+-export([init/1]).
+
+start_link(IniFilename) ->
+ case whereis(couch_server_sup) of
+ undefined ->
+ start_server(IniFilename);
+ _Else ->
+ {error, already_started}
+ end.
+
+start_server("") ->
+ % no ini file specified, check the command line args
+ IniFile =
+ case init:get_argument(couchini) of
+ {ok, [CmdLineIniFilename]} ->
+ CmdLineIniFilename;
+ _Else ->
+ ?DEFAULT_INI
+ end,
+ start_server(IniFile);
+start_server(InputIniFilename) ->
+
+ case init:get_argument(pidfile) of
+ {ok, [PidFile]} ->
+ case file:write_file(PidFile, os:getpid()) of
+ ok -> ok;
+ Error -> io:format("Failed to write PID file ~s, error: ~p", [PidFile, Error])
+ end;
+ _ -> ok
+ end,
+
+ {ok, Cwd} = file:get_cwd(),
+ IniFilename = couch_util:abs_pathname(InputIniFilename),
+ IniBin =
+ case file:read_file(IniFilename) of
+ {ok, IniBin0} ->
+ IniBin0;
+ {error, enoent} ->
+ Msg = io_lib:format("Couldn't find server configuration file ~s.", [InputIniFilename]),
+ io:format("~s~n", [Msg]),
+ throw({startup_error, Msg})
+ end,
+ {ok, Ini} = couch_util:parse_ini(binary_to_list(IniBin)),
+
+ ConsoleStartupMsg = proplists:get_value({"Couch", "ConsoleStartupMsg"}, Ini, "Apache CouchDB is starting."),
+ LogLevel = list_to_atom(proplists:get_value({"Couch", "LogLevel"}, Ini, "error")),
+ DbRootDir = proplists:get_value({"Couch", "DbRootDir"}, Ini, "."),
+ HttpConfigFile = proplists:get_value({"Couch", "HttpConfigFile"}, Ini, "couch_httpd.conf"),
+ LogFile = proplists:get_value({"Couch", "LogFile"}, Ini, "couchdb.log"),
+ UtilDriverDir = proplists:get_value({"Couch", "UtilDriverDir"}, Ini, ""),
+ UpdateNotifierExes = proplists:get_all_values({"Couch", "DbUpdateNotificationProcess"}, Ini),
+ FtSearchQueryServer = proplists:get_value({"Couch", "FullTextSearchQueryServer"}, Ini, ""),
+ RemoteRestart = list_to_atom(proplists:get_value({"Couch", "AllowRemoteRestart"}, Ini, "undefined")),
+ ServerOptions = [{remote_restart, RemoteRestart}],
+ QueryServers = [{Lang, QueryExe} || {{"Couch Query Servers", Lang}, QueryExe} <- Ini],
+
+ ChildProcesses =
+ [{couch_log,
+ {couch_log, start_link, [LogFile, LogLevel]},
+ permanent,
+ brutal_kill,
+ worker,
+ [couch_server]},
+ {couch_db_update_event,
+ {gen_event, start_link, [{local, couch_db_update}]},
+ permanent,
+ 1000,
+ supervisor,
+ dynamic},
+ {couch_server,
+ {couch_server, sup_start_link, [DbRootDir, ServerOptions]},
+ permanent,
+ brutal_kill,
+ worker,
+ [couch_server]},
+ {couch_util,
+ {couch_util, start_link, [UtilDriverDir]},
+ permanent,
+ brutal_kill,
+ worker,
+ [couch_util]},
+ {couch_query_servers,
+ {couch_query_servers, start_link, [QueryServers]},
+ permanent,
+ brutal_kill,
+ worker,
+ [couch_query_servers]},
+ {couch_view,
+ {couch_view, start_link, [DbRootDir]},
+ permanent,
+ brutal_kill,
+ worker,
+ [couch_view]},
+ {httpd,
+ {httpd, start_link, [HttpConfigFile]},
+ permanent,
+ 1000,
+ supervisor,
+ [httpd]}
+ ] ++
+ lists:map(fun(UpdateNotifierExe) ->
+ {UpdateNotifierExe,
+ {couch_db_update_notifier, start_link, [UpdateNotifierExe]},
+ permanent,
+ 1000,
+ supervisor,
+ [couch_db_update_notifier]}
+ end, UpdateNotifierExes)
+ ++
+ case FtSearchQueryServer of
+ "" ->
+ [];
+ _ ->
+ [{couch_ft_query,
+ {couch_ft_query, start_link, [FtSearchQueryServer]},
+ permanent,
+ 1000,
+ supervisor,
+ [httpd]}]
+ end,
+
+ io:format("couch ~s (LogLevel=~s)~n", [couch_server:get_version(), LogLevel]),
+ io:format("~s~n", [ConsoleStartupMsg]),
+
+ process_flag(trap_exit, true),
+ StartResult = (catch supervisor:start_link(
+ {local, couch_server_sup}, couch_server_sup, ChildProcesses)),
+
+ ConfigInfo = io_lib:format("Config Info ~s:~n\tCurrentWorkingDir=~s~n" ++
+ "\tDbRootDir=~s~n" ++
+ "\tHttpConfigFile=~s~n" ++
+ "\tLogFile=~s~n" ++
+ "\tUtilDriverDir=~s~n" ++
+ "\tDbUpdateNotificationProcesses=~s~n" ++
+ "\tFullTextSearchQueryServer=~s~n" ++
+ "~s",
+ [IniFilename,
+ Cwd,
+ DbRootDir,
+ HttpConfigFile,
+ LogFile,
+ UtilDriverDir,
+ UpdateNotifierExes,
+ FtSearchQueryServer,
+ [lists:flatten(io_lib:format("\t~s=~s~n", [Lang, QueryExe])) || {Lang, QueryExe} <- QueryServers]]),
+ couch_log:debug("~s", [ConfigInfo]),
+
+ case StartResult of
+ {ok,_} ->
+ % only output when startup was successful
+ io:format("Apache CouchDB has started. Time to relax.~n");
+ _ ->
+ % Since we failed startup, unconditionally dump configuration data to console
+ io:format("~s", [ConfigInfo]),
+ ok
+ end,
+ process_flag(trap_exit, false),
+ StartResult.
+
+stop() ->
+ catch exit(whereis(couch_server_sup), normal),
+ couch_log:stop().
+
+init(ChildProcesses) ->
+ {ok, {{one_for_one, 10, 3600}, ChildProcesses}}.
Added: incubator/couchdb/trunk/src/couchdb/couch_stream.erl
URL: http://svn.apache.org/viewvc/incubator/couchdb/trunk/src/couchdb/couch_stream.erl?rev=642432&view=auto
==============================================================================
--- incubator/couchdb/trunk/src/couchdb/couch_stream.erl (added)
+++ incubator/couchdb/trunk/src/couchdb/couch_stream.erl Fri Mar 28 16:32:19 2008
@@ -0,0 +1,252 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_stream).
+-behaviour(gen_server).
+
+-export([test/1]).
+-export([open/1, open/2, close/1, read/3, read_term/2, write/2, write_term/2, get_state/1, foldl/5]).
+-export([copy/4]).
+-export([ensure_buffer/2, set_min_buffer/2]).
+-export([init/1, terminate/2, handle_call/3]).
+-export([handle_cast/2,code_change/3,handle_info/2]).
+
+-include("couch_db.hrl").
+
+-define(FILE_POINTER_BYTES, 8).
+-define(FILE_POINTER_BITS, 8*(?FILE_POINTER_BYTES)).
+
+-define(STREAM_OFFSET_BYTES, 4).
+-define(STREAM_OFFSET_BITS, 8*(?STREAM_OFFSET_BYTES)).
+
+-define(HUGE_CHUNK, 1000000000). % Huge chuck size when reading all in one go
+
+-define(DEFAULT_STREAM_CHUNK, 16#00100000). % 1 meg chunks when streaming data
+
+
+-record(write_stream,
+ {fd = 0,
+ current_pos = 0,
+ bytes_remaining = 0,
+ next_alloc = 0,
+ min_alloc = 16#00010000
+ }).
+
+-record(stream,
+ {
+ pid,
+ fd
+ }).
+
+
+%%% Interface functions %%%
+
+open(Fd) ->
+ open(nil, Fd).
+
+open(nil, Fd) ->
+ open({0,0}, Fd);
+open(State, Fd) ->
+ {ok, Pid} = gen_server:start_link(couch_stream, {State, Fd}, []),
+ {ok, #stream{pid = Pid, fd = Fd}}.
+
+close(#stream{pid = Pid, fd = _Fd}) ->
+ gen_server:call(Pid, close).
+
+get_state(#stream{pid = Pid, fd = _Fd}) ->
+ gen_server:call(Pid, get_state).
+
+ensure_buffer(#stream{pid = Pid, fd = _Fd}, Bytes) ->
+ gen_server:call(Pid, {ensure_buffer, Bytes}).
+
+set_min_buffer(#stream{pid = Pid, fd = _Fd}, Bytes) ->
+ gen_server:call(Pid, {set_min_buffer, Bytes}).
+
+read(#stream{pid = _Pid, fd = Fd}, Sp, Num) ->
+ read(Fd, Sp, Num);
+read(Fd, Sp, Num) ->
+ {ok, RevBin, Sp2} = stream_data(Fd, Sp, Num, ?HUGE_CHUNK, fun(Bin, Acc) -> {ok, [Bin | Acc]} end, []),
+ Bin = list_to_binary(lists:reverse(RevBin)),
+ {ok, Bin, Sp2}.
+
+copy(#stream{pid = _Pid, fd = Fd}, Sp, Num, DestStream) ->
+ copy(Fd, Sp, Num, DestStream);
+copy(Fd, Sp, Num, DestStream) ->
+ {ok, NewSp, _Sp2} = stream_data(Fd, Sp, Num, ?HUGE_CHUNK,
+ fun(Bin, AccPointer) ->
+ {ok, NewPointer} = write(Bin, DestStream),
+ if AccPointer == null -> NewPointer; true -> AccPointer end
+ end,
+ null),
+ {ok, NewSp}.
+
+foldl(#stream{pid = _Pid, fd = Fd}, Sp, Num, Fun, Acc) ->
+ foldl(Fd, Sp, Num, Fun, Acc);
+foldl(Fd, Sp, Num, Fun, Acc) ->
+ {ok, _Acc, _Sp} = stream_data(Fd, Sp, Num, ?DEFAULT_STREAM_CHUNK, Fun, Acc).
+
+read_term(#stream{pid = _Pid, fd = Fd}, Sp) ->
+ read_term(Fd, Sp);
+read_term(Fd, Sp) ->
+ {ok, <<TermLen:(?STREAM_OFFSET_BITS)>>, Sp2}
+ = read(Fd, Sp, ?STREAM_OFFSET_BYTES),
+ {ok, Bin, _Sp3} = read(Fd, Sp2, TermLen),
+ {ok, binary_to_term(Bin)}.
+
+write_term(Stream, Term) ->
+ Bin = term_to_binary(Term),
+ Size = size(Bin),
+ Bin2 = <<Size:(?STREAM_OFFSET_BITS), Bin/binary>>,
+ write(Stream, Bin2).
+
+write(#stream{}, <<>>) ->
+ {ok, {0,0}};
+write(#stream{pid = Pid}, Bin) when is_binary(Bin) ->
+ gen_server:call(Pid, {write, Bin}).
+
+
+init({{Pos, BytesRemaining}, Fd}) ->
+ {ok, #write_stream
+ {fd = Fd,
+ current_pos = Pos,
+ bytes_remaining = BytesRemaining
+ }}.
+
+terminate(_Reason, _Stream) ->
+ ok.
+
+handle_call(get_state, _From, Stream) ->
+ #write_stream{current_pos = Pos, bytes_remaining = BytesRemaining} = Stream,
+ {reply, {Pos, BytesRemaining}, Stream};
+handle_call({set_min_buffer, MinBuffer}, _From, Stream) ->
+ {reply, ok, Stream#write_stream{min_alloc = MinBuffer}};
+handle_call({ensure_buffer, BufferSizeRequested}, _From, Stream) ->
+ #write_stream{bytes_remaining = BytesRemainingInCurrentBuffer} = Stream,
+ case BytesRemainingInCurrentBuffer < BufferSizeRequested of
+ true -> NextAlloc = BufferSizeRequested - BytesRemainingInCurrentBuffer;
+ false -> NextAlloc = 0 % enough room in current segment
+ end,
+ {reply, ok, Stream#write_stream{next_alloc = NextAlloc}};
+handle_call({write, Bin}, _From, Stream) ->
+ % ensure init is called first so we can get a pointer to the begining of the binary
+ {ok, Sp, Stream2} = write_data(Stream, Bin),
+ {reply, {ok, Sp}, Stream2};
+handle_call(close, _From, Stream) ->
+ #write_stream{current_pos=Pos, bytes_remaining = BytesRemaining} = Stream,
+ {stop, normal, {ok, {Pos, BytesRemaining}}, Stream}.
+
+handle_cast(_Msg, State) ->
+ {noreply,State}.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+%%% Internal function %%%
+
+stream_data(_Fd, Sp, 0, _MaxChunk, _Fun, Acc) ->
+ {ok, Acc, Sp};
+stream_data(Fd, {Pos, 0}, Num, MaxChunk, Fun, Acc) ->
+ {ok, <<NextPos:(?FILE_POINTER_BITS), NextOffset:(?STREAM_OFFSET_BITS)>>}
+ = couch_file:pread(Fd, Pos, ?FILE_POINTER_BYTES + ?STREAM_OFFSET_BYTES),
+ Sp = {NextPos, NextOffset},
+ % Check NextPos is past current Pos (this is always true in a stream)
+ % Guards against potential infinite loops caused by corruption.
+ case NextPos > Pos of
+ true -> ok;
+ false -> throw({error, stream_corruption})
+ end,
+ stream_data(Fd, Sp, Num, MaxChunk, Fun, Acc);
+stream_data(Fd, {Pos, Offset}, Num, MaxChunk, Fun, Acc) ->
+ ReadAmount = lists:min([MaxChunk, Num, Offset]),
+ {ok, Bin} = couch_file:pread(Fd, Pos, ReadAmount),
+ Sp = {Pos + ReadAmount, Offset - ReadAmount},
+ case Fun(Bin, Acc) of
+ {ok, Acc2} ->
+ stream_data(Fd, Sp, Num - ReadAmount, MaxChunk, Fun, Acc2);
+ {stop, Acc2} ->
+ {ok, Acc2, Sp}
+ end.
+
+write_data(Stream, <<>>) ->
+ {ok, {0,0}, Stream};
+write_data(#write_stream{bytes_remaining=0} = Stream, Bin) ->
+ #write_stream {
+ fd = Fd,
+ current_pos = CurrentPos,
+ next_alloc = NextAlloc,
+ min_alloc = MinAlloc
+ }= Stream,
+
+ NewSize = lists:max([MinAlloc, NextAlloc, size(Bin)]),
+ % no space in the current segment, must alloc a new segment
+ {ok, NewPos} = couch_file:expand(Fd, NewSize + ?FILE_POINTER_BYTES + ?STREAM_OFFSET_BYTES),
+
+ case CurrentPos of
+ 0 ->
+ ok;
+ _ ->
+ ok = couch_file:pwrite(Fd, CurrentPos, <<NewPos:(?FILE_POINTER_BITS), NewSize:(?STREAM_OFFSET_BITS)>>)
+ end,
+ Stream2 = Stream#write_stream{
+ current_pos=NewPos,
+ bytes_remaining=NewSize,
+ next_alloc=0},
+ write_data(Stream2, Bin);
+write_data(#write_stream{fd=Fd, current_pos=Pos, bytes_remaining=BytesRemaining} = Stream, Bin) ->
+ BytesToWrite = lists:min([size(Bin), BytesRemaining]),
+ {WriteBin, Rest} = split_binary(Bin, BytesToWrite),
+ ok = couch_file:pwrite(Fd, Pos, WriteBin),
+ Stream2 = Stream#write_stream{
+ bytes_remaining=BytesRemaining - BytesToWrite,
+ current_pos=Pos + BytesToWrite
+ },
+ {ok, _, Stream3} = write_data(Stream2, Rest),
+ {ok, {Pos, BytesRemaining}, Stream3}.
+
+
+
+%%% Tests %%%
+
+
+test(Term) ->
+ {ok, Fd} = couch_file:open("foo", [write]),
+ {ok, Stream} = open({0,0}, Fd),
+ {ok, Pos} = write_term(Stream, Term),
+ {ok, Pos2} = write_term(Stream, {Term, Term}),
+ close(Stream),
+ couch_file:close(Fd),
+ {ok, Fd2} = couch_file:open("foo", [read, write]),
+ {ok, Stream2} = open({0,0}, Fd2),
+ {ok, Term1} = read_term(Fd2, Pos),
+ io:format("Term1: ~w ~n",[Term1]),
+ {ok, Term2} = read_term(Fd2, Pos2),
+ io:format("Term2: ~w ~n",[Term2]),
+ {ok, PointerList} = deep_write_test(Stream2, Term, 1000, []),
+ deep_read_test(Fd2, PointerList),
+ close(Stream2),
+ couch_file:close(Fd2).
+
+deep_read_test(_Fd, []) ->
+ ok;
+deep_read_test(Fd, [Pointer | RestPointerList]) ->
+ {ok, _Term} = read_term(Fd, Pointer),
+ deep_read_test(Fd, RestPointerList).
+
+deep_write_test(_Stream, _Term, 0, PointerList) ->
+ {ok, PointerList};
+deep_write_test(Stream, Term, N, PointerList) ->
+ WriteList = lists:duplicate(random:uniform(N), Term),
+ {ok, Pointer} = write_term(Stream, WriteList),
+ deep_write_test(Stream, Term, N-1, [Pointer | PointerList]).