You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@lucene.apache.org by "Ethan Tao (JIRA)" <ji...@apache.org> on 2012/05/21 20:57:41 UTC

[jira] [Commented] (SOLR-2694) LogUpdateProcessor not thread safe

    [ https://issues.apache.org/jira/browse/SOLR-2694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13280359#comment-13280359 ] 

Ethan Tao commented on SOLR-2694:
---------------------------------

Hi, we are having the same issue with latest snapshot. 
When updating the same set of docs repeatedly, we can easily reproduce it. We also have 10 secs commit in our env.
I checked the code with LogUpdateProcessor, the code is not thread safe with "toLog" object as concurrent threads may be calling other methods when finish() is called. The problem is when "sb.append(toLog)" is called, StringBuilder starts to use namedlist to build the string object. If other methods are called to modify the objects reside within "toLog", the exception will be triggered. These objects include deletes, adds, etc. Also, the keys to these object have 7 different types (commit/rollback, ..etc.), any size change to the SimpleOrderedMap (i.e., toLog) object may also trigger the issue during the call to "sb.append(toLog)" in finish(). 
We did quick experiment by modifying the LogUpdateProcessorFactory file to synchronize on toLog in all methods. Then change our chain definition in solrconfig.xml to call our own LogUpdateProcessor instead. The issue went away. I am attaching the custom code below for your reference.

Can someone please re-open and fix the issue? 
Thanks.

-Ethan 

class LogUpdateProcessor extends UpdateRequestProcessor {

...

    @Override
    public void processAdd(AddUpdateCommand cmd) throws IOException {
        if (logDebug) { log.debug("PRE_UPDATE " + cmd.toString()); }

        // call delegate first so we can log things like the version that get set later
        if (next != null) next.processAdd(cmd);

        synchronized (toLog) {

            // Add a list of added id's to the response
            if (adds == null) {
                adds = new ArrayList<String>();
                toLog.add("add",adds);
            }

            if (adds.size() < maxNumToLog) {
                long version = cmd.getVersion();
                String msg = cmd.getPrintableId();
                if (version != 0) msg = msg + " (" + version + ')';
                adds.add(msg);
            }


            numAdds++;

        }
    }

    @Override
    public void processDelete( DeleteUpdateCommand cmd ) throws IOException {
        if (logDebug) { log.debug("PRE_UPDATE " + cmd.toString()); }
        if (next != null) next.processDelete(cmd);

        synchronized (toLog) {
            if (cmd.isDeleteById()) {
                if (deletes == null) {
                    deletes = new ArrayList<String>();
                    toLog.add("delete",deletes);
                }
                if (deletes.size() < maxNumToLog) {
                    long version = cmd.getVersion();
                    String msg = cmd.getId();
                    if (version != 0) msg = msg + " (" + version + ')';
                    deletes.add(msg);
                }
            } else {
                if (toLog.size() < maxNumToLog) {
                    long version = cmd.getVersion();
                    String msg = cmd.query;
                    if (version != 0) msg = msg + " (" + version + ')';
                    toLog.add("deleteByQuery", msg);
                }
            }
            numDeletes++;
        }

    }

    @Override
    public void processMergeIndexes(MergeIndexesCommand cmd) throws IOException {
        if (logDebug) { log.debug("PRE_UPDATE " + cmd.toString()); }
        if (next != null) next.processMergeIndexes(cmd);

        synchronized (toLog) {
            toLog.add("mergeIndexes", cmd.toString());
        }
    }

    @Override
    public void processCommit( CommitUpdateCommand cmd ) throws IOException {
        if (logDebug) { log.debug("PRE_UPDATE " + cmd.toString()); }
        if (next != null) next.processCommit(cmd);


        synchronized (toLog) {
            final String msg = cmd.optimize ? "optimize" : "commit";
            toLog.add(msg, "");
        }
    }

    /**
     * @since Solr 1.4
     */
    @Override
    public void processRollback( RollbackUpdateCommand cmd ) throws IOException {
        if (logDebug) { log.debug("PRE_UPDATE " + cmd.toString()); }
        if (next != null) next.processRollback(cmd);

        synchronized (toLog) {
            toLog.add("rollback", "");
        }
    }


    @Override
    public void finish() throws IOException {
        if (logDebug) { log.debug("PRE_UPDATE finish()"); }
        if (next != null) next.finish();

        // LOG A SUMMARY WHEN ALL DONE (INFO LEVEL)

        NamedList<Object> stdLog = rsp.getToLog();

        StringBuilder sb = new StringBuilder(req.getCore().getLogId());

        for (int i=0; i<stdLog.size(); i++) {
            String name = stdLog.getName(i);
            Object val = stdLog.getVal(i);
            if (name != null) {
                sb.append(name).append('=');
            }
            sb.append(val).append(' ');
        }

        stdLog.clear();   // make it so SolrCore.exec won't log this again

        synchronized (toLog) {

            // if id lists were truncated, show how many more there were
            if (adds != null && numAdds > maxNumToLog) {
                adds.add("... (" + numAdds + " adds)");
            }
            if (deletes != null && numDeletes > maxNumToLog) {
                deletes.add("... (" + numDeletes + " deletes)");
            }
            long elapsed = rsp.getEndTime() - req.getStartTime();

            sb.append(toLog).append(" 0 ").append(elapsed);

        }

        log.info(sb.toString());
    }
}
                
> LogUpdateProcessor not thread safe
> ----------------------------------
>
>                 Key: SOLR-2694
>                 URL: https://issues.apache.org/jira/browse/SOLR-2694
>             Project: Solr
>          Issue Type: Bug
>          Components: update
>    Affects Versions: 1.4.1, 3.1, 3.2, 3.3, 4.0
>            Reporter: Jan Høydahl
>
> Using the LogUpdateProcessor while feeding in multiple parallell threads does not work, as LogUpdateProcessor is not threadsafe.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

       

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@lucene.apache.org
For additional commands, e-mail: dev-help@lucene.apache.org