You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@oozie.apache.org by "Purshotam Shah (JIRA)" <ji...@apache.org> on 2014/05/23 21:08:02 UTC

[jira] [Commented] (OOZIE-1699) Some of the commands submitted to Oozie internal queue are never executed

    [ https://issues.apache.org/jira/browse/OOZIE-1699?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14007552#comment-14007552 ] 

Purshotam Shah commented on OOZIE-1699:
---------------------------------------

We noticed that this fix is causing congestion. Queue operation were talking longer time than expected.

There are two lock being used for queuing commands, which is causing delay.

First, all queue command function of CallableQueueService is synchronized.

{code}

public synchronized boolean queue(XCallable<?> callable, long delay) {
    if (callable == null) {
        return true;
    }
    boolean queued = false;
    if (Services.get().getSystemMode() == SYSTEM_MODE.SAFEMODE) {
        log.warn("[queue] System is in SAFEMODE. Hence no callable is queued. current queue size " + queue.size());
    }
    else {
        checkInterruptTypes(callable);
        queued = queue(new CallableWrapper(callable, delay), false);
        if (queued) {
            incrCounter(INSTR_QUEUED_COUNTER, 1);
        }
        else {
            log.warn("Could not queue callable");
        }
    }
    return queued;
}

public synchronized boolean queueSerial(List<? extends XCallable<?>> callables, long delay) {
    boolean queued;
    if (callables == null || callables.size() == 0) {
        queued = true;
    }
    else if (callables.size() == 1) {
        queued = queue(callables.get(0), delay);
    }
    else {
        XCallable<?> callable = new CompositeCallable(callables);
        queued = queue(callable, delay);
        if (queued) {
            incrCounter(INSTR_QUEUED_COUNTER, callables.size());
        }
    }
    return queued;
}
{code}

Second, add function of PriorityDelayQueue need to acquire ReentrantLock
lock, which was added as part of this patch.
{code}

+        boolean accepted;
+        lock.lock();
+        try {
+            accepted = queues[queueElement.getPriority()].offer(queueElement);
+            debug("offer([{0}]), to P[{1}] delay[{2}ms] accepted[{3}]", queueElement.getElement().toString(),
+                  queueElement.getPriority(), queueElement.getDelay(TimeUnit.MILLISECONDS), accepted);
+            if (accepted) {
+                if (currentSize != null) {
+                    currentSize.incrementAndGet();
+                }
+                queueElement.inQueue = true;
             }
-            queueElement.inQueue = true;
+        } finally {
+            lock.unlock();
         }
		{code}
Acquiring two lock for queueing a single command  was slowing down the system.
Server was taking more than 4 sec, just to queue a command.


After we rollback this patch and server was back to normal.

> Some of the commands submitted to Oozie internal queue are never executed
> -------------------------------------------------------------------------
>
>                 Key: OOZIE-1699
>                 URL: https://issues.apache.org/jira/browse/OOZIE-1699
>             Project: Oozie
>          Issue Type: Bug
>            Reporter: Srikanth Sundarrajan
>            Assignee: Srikanth Sundarrajan
>            Priority: Critical
>             Fix For: trunk
>
>         Attachments: OOZIE-1699-v1-no-prefix.patch, OOZIE-1699.patch
>
>
> At scale, we are seeing issues with some command submitted to the command queue in CallableQueueService aren't getting executed at all.



--
This message was sent by Atlassian JIRA
(v6.2#6252)