You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@gobblin.apache.org by "Chen Guo (Jira)" <ji...@apache.org> on 2020/05/15 23:37:00 UTC
[jira] [Resolved] (GOBBLIN-1140) Listeners monitoring added
FlowSpecs are fired before Specs are successfully added
[ https://issues.apache.org/jira/browse/GOBBLIN-1140?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Chen Guo resolved GOBBLIN-1140.
-------------------------------
Resolution: Fixed
> Listeners monitoring added FlowSpecs are fired before Specs are successfully added
> ----------------------------------------------------------------------------------
>
> Key: GOBBLIN-1140
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1140
> Project: Apache Gobblin
> Issue Type: Bug
> Reporter: Chen Guo
> Priority: Critical
>
> This bug was introduced in the commit for [GOBBLIN-1082] "compile a flow before storing it in spec catalog" on 3/12/2020.
> Previously, the listeners were triggered after spec has been added to specStore.
> {code:java}
> specStore.addSpec(spec);
> metrics.updatePutSpecTime(startTime);
> if (triggerListener) {
> AddSpecResponse<CallbacksDispatcher.CallbackResults<SpecCatalogListener, AddSpecResponse>> response = this.listeners.onAddSpec(spec);
> for (Map.Entry<SpecCatalogListener, CallbackResult<AddSpecResponse>> entry: response.getValue().getSuccesses().entrySet()) {
> responseMap.put(entry.getKey().getName(), entry.getValue().getResult());
> }
> }
> {code}
> After the change, the order becomes
> {code:java}
> if (triggerListener) {
> AddSpecResponse<CallbacksDispatcher.CallbackResults<SpecCatalogListener, AddSpecResponse>> response = this.listeners.onAddSpec(spec);
> for (Map.Entry<SpecCatalogListener, CallbackResult<AddSpecResponse>> entry: response.getValue().getSuccesses().entrySet()) {
> responseMap.put(entry.getKey().getName(), entry.getValue().getResult());
> }
> }
> boolean compileSuccess = isCompileSuccessful(responseMap);
> if (compileSuccess) {
> long startTime = System.currentTimeMillis();
> metrics.updatePutSpecTime(startTime);
> try {
> specStore.addSpec(spec);
> } catch (IOException e) {
> throw new RuntimeException("Cannot add Spec to Spec store: " + spec, e);
> }
> responseMap.put(ServiceConfigKeys.COMPILATION_SUCCESSFUL, new AddSpecResponse<>("true"));
> }
> {code}
> Due to this bug, the flow spec for the on-demand triggering cannot be deleted. One of the listener is GobblinServiceJobScheduler, which will delete the flowSpec file. Now the deletion operation is performed before the flowSpec file is created.
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)