You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@ignite.apache.org by "Darren Edmonds (JIRA)" <ji...@apache.org> on 2015/08/04 10:06:04 UTC

[jira] [Comment Edited] (IGNITE-1179) Futures stop working, unknown cause.

    [ https://issues.apache.org/jira/browse/IGNITE-1179?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14653221#comment-14653221 ] 

Darren Edmonds edited comment on IGNITE-1179 at 8/4/15 8:05 AM:
----------------------------------------------------------------

Hi Dmitriy,

The factory will return the correct listener based on the type of job running (only 1 type so far), so this line...

Code:
{code:title=FutureListenerFactory.java|borderStyle=solid}
    ...
    return new NodeJobFutureListener(jobService, service, assetStoreService, job, cacheService);
    ...
{code}

The NodeJobFutureListener itself will process the result.  If the render failed (depending on the failure) the job status is updated and the scheduler will either pick the job up from another retry or it'll set it as failed.

If successful, the output (pdf byte array/stream) is saved to our asset store (Filesystem) for later retrieval.

The job assets are deleted from the Ignite cache and we release the job permit (java.util.concurrent.Semaphore) which the scheduler uses to track jobs in progress before releasing new ones to the worker nodes.

Code:
{code:title=NodeJobFutureListener.java (implements IgniteInClosure<IgniteFuture<NodeJobResult>>)|borderStyle=solid}
    ...
    private void processResult(IgniteFuture<NodeJobResult> jobResult) throws UpdatingCoreException {
        try {
            NodeJobResult nodeJobResult = jobResult.get();
            if (nodeJobResult == null) {
                job.addStatus(JobStatusEnum.FAILED);
            } else {
                NodeJobError errorHandler = nodeJobResult.getNodeJobErrors();
                if (errorHandler.hasEntries()) {
                    saveErrors(job, errorHandler);
                }
                if (!errorHandler.hasFatal()) {
                    byte[] resultDocument = nodeJobResult.getDocument();
                    saveResult(job, resultDocument, job.getJobType());
                    job.addStatus(JobStatusEnum.DONE);
                } else {
                    job.addStatus(JobStatusEnum.FAILED);
                }
            }
            updateJob(job);
        } catch (IgniteException ex) {
            Throwable cause = Throwables.getRootCause(ex);
            LOG.warn("JOB [{}] GridException when obtaining result for this object : {}", job.getKey(), cause.getMessage());
            if (retryException(cause) && job.getRetryAttempts() < service.getJobRetryLimit()) { // Retry
                LOG.warn("JOB [{}] Reprocess job due to GridException exception - attempting to resubmit.\n\tException is {}", job.getKey() cause.getLocalizedMessage());

                job.setPriority(JobPriorityEnum.HIGH);
                job.retry();
                job.addError(ErrorTypeEnum.ERROR, cause.toString());
                job.addStatus(JobStatusEnum.NEW);
            } else {
                LOG.error("JOB [{}] Failed to process this job. Setting status to FAILED.\n\tException is {}", job.getKey(), cause.getMessage());
                job.retry();
                job.addError(ErrorTypeEnum.FATAL, cause.toString());
                job.addStatus(JobStatusEnum.FAILED);
            }
            updateJob(job);
        } finally {
            service.releaseJobPermit(job.getKey().toString());
            // Remove any assets from cache.
            DeletedCacheAssetsEvent event = cacheService.deleteJobAssetsFromCache(new DeleteCacheAssetsEvent(job));
            if(event.getStatus() == DeletedEvent.StatusEnum.EXCEPTION){
                LOG.warn("An error occurred removing a jobs assets from the cache.", event.getException());
            }
        }
    }
    ...
{code}

The assets are removed from the Ignite cache with the following code:
{code:title=CacheServiceImpl.java|borderStyle=solid}
    ...
    public DeletedCacheAssetsEvent deleteGroupAssetsFromCache(DeleteCacheGroupAssetsEvent cacheGroupAssetsEvent) {
        LOG.debug("DeleteCache : Getting group Job IDs...");
        IgniteCache assetCache = grid.cache("ASSET");
        RequestedGroupJobIdsEvent groupJobIdsEvent = jobService.requestJobIds(new RequestGroupJobIdsEvent(cacheGroupAssetsEvent.getGroupId()));
        if (groupJobIdsEvent.getStatus() == RequestedEvent.StatusEnum.READ) {
            LOG.debug("DeleteCache : Removing entries from cache...");
            for (UUID jobId : groupJobIdsEvent.getGroupJobIds()) {
                assetCache.remove(jobId.toString());
                assetCache.clear(jobId.toString());
            }
            return new DeletedCacheAssetsEvent(DeletedEvent.StatusEnum.DELETED);
        } else if(groupJobIdsEvent.getStatus() == RequestedEvent.StatusEnum.NOT_FOUND){
            return new DeletedCacheAssetsEvent(DeletedEvent.StatusEnum.NOT_FOUND);
        } else {
            return new DeletedCacheAssetsEvent(groupJobIdsEvent.getException());
        }
    }
    ...
{code}


was (Author: dabbo):
Hi Dmitriy,

The factory will return the correct listener based on the type of job running (only 1 type so far), so this line...

Code:
{code:title=FutureListenerFactory.java|borderStyle=solid}
    ...
    return new NodeJobFutureListener(jobService, service, assetStoreService, job, cacheService);
    ...
{code}

The NodeJobFutureListener itself will process the result.  If the render failed (depending on the failure) the job status is updated and the scheduler will either pick the job up from another retry or it'll set it as failed.

If successful, the output (pdf byte array/stream) is saved to our asset store (Filesystem) for later retrieval.

The job assets are deleted from the Ignite cache and we release the job permit (java.util.concurrent.Semaphore) which the scheduler uses to track jobs in progress before releasing new ones to the worker nodes.

Code:
{code:title=NodeJobFutureListener.java|borderStyle=solid}
    ...
    private void processResult(IgniteFuture<NodeJobResult> jobResult) throws UpdatingCoreException {
        try {
            NodeJobResult nodeJobResult = jobResult.get();
            if (nodeJobResult == null) {
                job.addStatus(JobStatusEnum.FAILED);
            } else {
                NodeJobError errorHandler = nodeJobResult.getNodeJobErrors();
                if (errorHandler.hasEntries()) {
                    saveErrors(job, errorHandler);
                }
                if (!errorHandler.hasFatal()) {
                    byte[] resultDocument = nodeJobResult.getDocument();
                    saveResult(job, resultDocument, job.getJobType());
                    job.addStatus(JobStatusEnum.DONE);
                } else {
                    job.addStatus(JobStatusEnum.FAILED);
                }
            }
            updateJob(job);
        } catch (IgniteException ex) {
            Throwable cause = Throwables.getRootCause(ex);
            LOG.warn("JOB [{}] GridException when obtaining result for this object : {}", job.getKey(), cause.getMessage());
            if (retryException(cause) && job.getRetryAttempts() < service.getJobRetryLimit()) { // Retry
                LOG.warn("JOB [{}] Reprocess job due to GridException exception - attempting to resubmit.\n\tException is {}", job.getKey() cause.getLocalizedMessage());

                job.setPriority(JobPriorityEnum.HIGH);
                job.retry();
                job.addError(ErrorTypeEnum.ERROR, cause.toString());
                job.addStatus(JobStatusEnum.NEW);
            } else {
                LOG.error("JOB [{}] Failed to process this job. Setting status to FAILED.\n\tException is {}", job.getKey(), cause.getMessage());
                job.retry();
                job.addError(ErrorTypeEnum.FATAL, cause.toString());
                job.addStatus(JobStatusEnum.FAILED);
            }
            updateJob(job);
        } finally {
            service.releaseJobPermit(job.getKey().toString());
            // Remove any assets from cache.
            DeletedCacheAssetsEvent event = cacheService.deleteJobAssetsFromCache(new DeleteCacheAssetsEvent(job));
            if(event.getStatus() == DeletedEvent.StatusEnum.EXCEPTION){
                LOG.warn("An error occurred removing a jobs assets from the cache.", event.getException());
            }
        }
    }
    ...
{code}

The assets are removed from the Ignite cache with the following code:
{code:title=CacheServiceImpl.java|borderStyle=solid}
    ...
    public DeletedCacheAssetsEvent deleteGroupAssetsFromCache(DeleteCacheGroupAssetsEvent cacheGroupAssetsEvent) {
        LOG.debug("DeleteCache : Getting group Job IDs...");
        IgniteCache assetCache = grid.cache("ASSET");
        RequestedGroupJobIdsEvent groupJobIdsEvent = jobService.requestJobIds(new RequestGroupJobIdsEvent(cacheGroupAssetsEvent.getGroupId()));
        if (groupJobIdsEvent.getStatus() == RequestedEvent.StatusEnum.READ) {
            LOG.debug("DeleteCache : Removing entries from cache...");
            for (UUID jobId : groupJobIdsEvent.getGroupJobIds()) {
                assetCache.remove(jobId.toString());
                assetCache.clear(jobId.toString());
            }
            return new DeletedCacheAssetsEvent(DeletedEvent.StatusEnum.DELETED);
        } else if(groupJobIdsEvent.getStatus() == RequestedEvent.StatusEnum.NOT_FOUND){
            return new DeletedCacheAssetsEvent(DeletedEvent.StatusEnum.NOT_FOUND);
        } else {
            return new DeletedCacheAssetsEvent(groupJobIdsEvent.getException());
        }
    }
    ...
{code}

> Futures stop working, unknown cause.
> ------------------------------------
>
>                 Key: IGNITE-1179
>                 URL: https://issues.apache.org/jira/browse/IGNITE-1179
>             Project: Ignite
>          Issue Type: Bug
>          Components: compute
>            Reporter: Darren Edmonds
>            Assignee: Andrey Gura
>         Attachments: ApplicationLogsAndConfig_1.3.2.7z
>
>
> Running against Apache Ignite version: 
> ignite-core (1.3.2), ignite-spring (1.3.2), ignite-indexing (1.3.2) and ignite-slf4j (1.3.2).
> Originally built application against a Grid Gain version before moving to Apache Ignite.
> Web Application feeds jobs to the worker node(s) via it's own internal queue (client only Ignite).  After a few thousand jobs have successfully been processed, the worker and server just stop with jobs still waiting in the web server queue.  It appears that the worker node is running OK, and logs will show every job is successfully completing the render but the future on the server is not being triggered - loss of connection to node?
> I have attached the requested information to the ticket.  It's highly likely I've done something wrong rather than this being a bug with Ignite - any advice would be appreciated,
> Attachment contains:
> - Server stack
> - Worker node stack
> - Server log file (simple debug output, no output observed from Ignite set at INFO level nor any exceptions)
> - Worker log file (same setup as server log file)
> - Ignite configuration files for both server and worker.
> - Launch parameters for the java command line and tomcat7 server.
> Original forum post:
> http://apache-ignite-users.70518.x6.nabble.com/Futures-not-being-triggered-td773.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)