You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/12/03 15:44:00 UTC

[jira] [Commented] (NIFI-5838) Misconfigured Kite processors can block NiFi

    [ https://issues.apache.org/jira/browse/NIFI-5838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16707381#comment-16707381 ] 

ASF GitHub Bot commented on NIFI-5838:
--------------------------------------

Github user pvillard31 commented on the issue:

    https://github.com/apache/nifi/pull/3182
  
    Hey @ijokarumawak - just had the opportunity to test this on the cluster where I originally faced the issue. Only properly closing the `FileSystem` object didn't help. It's is necessary to also have the invalidation mechanism to prevent NiFi from being unavailable with the BLOCKED threads.


> Misconfigured Kite processors can block NiFi
> --------------------------------------------
>
>                 Key: NIFI-5838
>                 URL: https://issues.apache.org/jira/browse/NIFI-5838
>             Project: Apache NiFi
>          Issue Type: Bug
>          Components: Extensions
>            Reporter: Pierre Villard
>            Assignee: Pierre Villard
>            Priority: Critical
>
> I faced the following situation today: no way to access the NiFi UI (it was just hanging forever or throwing timeouts), no change in case of NiFi restart, even with disabling the auto resume feature.
> By looking at the thread dump, I found a lot of:
> {noformat}
> "NiFi Web Server-142" Id=142 BLOCKED  on org.apache.hadoop.ipc.Client$Connection@3843e7a6
>     at org.apache.hadoop.ipc.Client$Connection.addCall(Client.java:463)
>     at org.apache.hadoop.ipc.Client$Connection.access$2800(Client.java:375)
>     at org.apache.hadoop.ipc.Client.getConnection(Client.java:1522)
>     at org.apache.hadoop.ipc.Client.call(Client.java:1451)
>     at org.apache.hadoop.ipc.Client.call(Client.java:1412)
>     at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
>     at com.sun.proxy.$Proxy348.getBlockLocations(Unknown Source)
>     at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:255)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
>     at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
>     at com.sun.proxy.$Proxy349.getBlockLocations(Unknown Source)
>     at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1226)
>     at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1213)
>     at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1201)
>     at org.apache.hadoop.hdfs.DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength(DFSInputStream.java:306)
>     at org.apache.hadoop.hdfs.DFSInputStream.openInfo(DFSInputStream.java:272)
>     - waiting on java.lang.Object@46d84263
>     at org.apache.hadoop.hdfs.DFSInputStream.<init>(DFSInputStream.java:264)
>     at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1526)
>     at org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:304)
>     at org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:299)
>     at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
>     at org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:312)
>     at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:769)
>     at org.apache.nifi.processors.kite.AbstractKiteProcessor.getSchema(AbstractKiteProcessor.java:132)
>     at org.apache.nifi.processors.kite.AbstractKiteProcessor$3.validate(AbstractKiteProcessor.java:172)
>     at org.apache.nifi.components.PropertyDescriptor.validate(PropertyDescriptor.java:200)
>     at org.apache.nifi.components.AbstractConfigurableComponent.validate(AbstractConfigurableComponent.java:103)
>     at org.apache.nifi.controller.AbstractConfiguredComponent.validate(AbstractConfiguredComponent.java:329)
>     at org.apache.nifi.controller.StandardProcessorNode.isValid(StandardProcessorNode.java:968)
>     at org.apache.nifi.groups.StandardProcessGroup.getCounts(StandardProcessGroup.java:227)
>     at org.apache.nifi.groups.StandardProcessGroup.getCounts(StandardProcessGroup.java:261)
>     at org.apache.nifi.groups.StandardProcessGroup.getCounts(StandardProcessGroup.java:261)
>     at org.apache.nifi.groups.StandardProcessGroup.getCounts(StandardProcessGroup.java:261)
>     at org.apache.nifi.web.StandardNiFiServiceFacade.getSiteToSiteDetails(StandardNiFiServiceFacade.java:2609)
>     at org.apache.nifi.web.StandardNiFiServiceFacade$$FastClassBySpringCGLIB$$358780e0.invoke(<generated>)
>     at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:204)
>     at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:720)
>     at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157)
>     at org.springframework.aop.aspectj.MethodInvocationProceedingJoinPoint.proceed(MethodInvocationProceedingJoinPoint.java:85)
>     at org.apache.nifi.web.NiFiServiceFacadeLock.proceedWithReadLock(NiFiServiceFacadeLock.java:137)
>     at org.apache.nifi.web.NiFiServiceFacadeLock.getLock(NiFiServiceFacadeLock.java:108)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at org.springframework.aop.aspectj.AbstractAspectJAdvice.invokeAdviceMethodWithGivenArgs(AbstractAspectJAdvice.java:621)
>     at org.springframework.aop.aspectj.AbstractAspectJAdvice.invokeAdviceMethod(AbstractAspectJAdvice.java:610)
>     at org.springframework.aop.aspectj.AspectJAroundAdvice.invoke(AspectJAroundAdvice.java:68)
>     at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)
>     at org.springframework.aop.interceptor.ExposeInvocationInterceptor.invoke(ExposeInvocationInterceptor.java:92)
>     at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)
>     at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:655)
>     at org.apache.nifi.web.StandardNiFiServiceFacade$$EnhancerBySpringCGLIB$$6379e46a.getSiteToSiteDetails(<generated>)
>     at org.apache.nifi.web.api.SiteToSiteResource.getSiteToSiteDetails(SiteToSiteResource.java:160)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at com.sun.jersey.spi.container.JavaMethodInvokerFactory$1.invoke(JavaMethodInvokerFactory.java:60)
>     at com.sun.jersey.server.impl.model.method.dispatch.AbstractResourceMethodDispatchProvider$ResponseOutInvoker._dispatch(AbstractResourceMethodDispatchProvider.java:205)
>     at com.sun.jersey.server.impl.model.method.dispatch.ResourceJavaMethodDispatcher.dispatch(ResourceJavaMethodDispatcher.java:75)
>     at com.sun.jersey.server.impl.uri.rules.HttpMethodRule.accept(HttpMethodRule.java:302)
>     at com.sun.jersey.server.impl.uri.rules.ResourceClassRule.accept(ResourceClassRule.java:108)
>     at com.sun.jersey.server.impl.uri.rules.RightHandPathRule.accept(RightHandPathRule.java:147)
>     at com.sun.jersey.server.impl.uri.rules.RootResourceClassesRule.accept(RootResourceClassesRule.java:84)
>     at com.sun.jersey.server.impl.application.WebApplicationImpl._handleRequest(WebApplicationImpl.java:1542)
>     at com.sun.jersey.server.impl.application.WebApplicationImpl._handleRequest(WebApplicationImpl.java:1473)
>     at com.sun.jersey.server.impl.application.WebApplicationImpl.handleRequest(WebApplicationImpl.java:1419)
>     at com.sun.jersey.server.impl.application.WebApplicationImpl.handleRequest(WebApplicationImpl.java:1409)
>     at com.sun.jersey.spi.container.servlet.WebComponent.service(WebComponent.java:409)
>     at com.sun.jersey.spi.container.servlet.ServletContainer.service(ServletContainer.java:558)
>     at com.sun.jersey.spi.container.servlet.ServletContainer.service(ServletContainer.java:733)
>     at javax.servlet.http.HttpServlet.service(HttpServlet.java:790)
>     at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:841)
>     at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1634)
>     at org.apache.nifi.web.filter.RequestLogger.doFilter(RequestLogger.java:66)
>     at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1621)
>     at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:316)
>     at org.springframework.security.web.access.intercept.FilterSecurityInterceptor.invoke(FilterSecurityInterceptor.java:126)
>     at org.springframework.security.web.access.intercept.FilterSecurityInterceptor.doFilter(FilterSecurityInterceptor.java:90)
>     at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
>     at org.springframework.security.web.session.SessionManagementFilter.doFilter(SessionManagementFilter.java:122)
>     at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
>     at org.springframework.security.web.authentication.AnonymousAuthenticationFilter.doFilter(AnonymousAuthenticationFilter.java:111)
>     at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
>     at org.apache.nifi.web.security.NiFiAuthenticationFilter.doFilter(NiFiAuthenticationFilter.java:59)
>     at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
>     at org.apache.nifi.web.security.NiFiAuthenticationFilter.doFilter(NiFiAuthenticationFilter.java:59)
>     at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
>     at org.apache.nifi.web.security.NiFiAuthenticationFilter.authenticate(NiFiAuthenticationFilter.java:83)
>     at org.apache.nifi.web.security.NiFiAuthenticationFilter.doFilter(NiFiAuthenticationFilter.java:57)
>     at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
>     at org.springframework.security.web.FilterChainProxy.doFilterInternal(FilterChainProxy.java:213)
>     at org.springframework.security.web.FilterChainProxy.doFilter(FilterChainProxy.java:176)
>     at org.springframework.web.filter.DelegatingFilterProxy.invokeDelegate(DelegatingFilterProxy.java:346)
>     at org.springframework.web.filter.DelegatingFilterProxy.doFilter(DelegatingFilterProxy.java:262)
>     at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1621)
>     at org.apache.nifi.web.filter.TimerFilter.doFilter(TimerFilter.java:51)
>     at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1613)
>     at org.apache.nifi.web.server.JettyServer$2.doFilter(JettyServer.java:908)
>     at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1621)
>     at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:541)
>     at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:143)
>     at org.eclipse.jetty.security.SecurityHandler.handle(SecurityHandler.java:548)
>     at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132)
>     at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:190)
>     at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1593)
>     at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:188)
>     at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1239)
>     at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:168)
>     at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:481)
>     at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1562)
>     at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:166)
>     at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1141)
>     at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
>     at org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:118)
>     at org.eclipse.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:561)
>     at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132)
>     at org.eclipse.jetty.server.Server.handle(Server.java:564)
>     at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:320)
>     at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:251)
>     at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:279)
>     at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:110)
>     at org.eclipse.jetty.io.ssl.SslConnection.onFillable(SslConnection.java:258)
>     at org.eclipse.jetty.io.ssl.SslConnection$3.succeeded(SslConnection.java:147)
>     at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:110)
>     at org.eclipse.jetty.io.ChannelEndPoint$2.run(ChannelEndPoint.java:124)
>     at org.eclipse.jetty.util.thread.Invocable.invokePreferred(Invocable.java:122)
>     at org.eclipse.jetty.util.thread.strategy.ExecutingExecutionStrategy.invoke(ExecutingExecutionStrategy.java:58)
>     at org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.produceConsume(ExecuteProduceConsume.java:201)
>     at org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.run(ExecuteProduceConsume.java:133)
>     at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:672)
>     at org.eclipse.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:590)
>     at java.lang.Thread.run(Thread.java:748)
> {noformat}
> This is because of the {{validate}} method of the Kite processors (and more specifically the following processors: {{ConvertAvroSchema}}, {{ConvertCSVToAvro}}, {{ConvertJSONToAvro}}).
> By going through the flow.xml.gz file, I found the bad processor which was configured with:
> {noformat}
> <property>
>   <name>Record schema</name>
>   <value>Liste_queue_id;Limite_alerte_nb;Limite_alerte_volume;processeur_group;Projet;Contact</value>
> </property>{noformat}
> It appears the users thought it was possible to provide a schema a bit like a CSV header.
> Because of this schema definition, the code:
> {code:java}
> try {
>   uri = new URI(uriOrLiteral);
> } catch (URISyntaxException e) {
>   // try to parse the schema as a literal
>   return parseSchema(uriOrLiteral);
> }
> {code}
> is not throwing any exception, and we go to this part of the code:
> {code:java}
> } else {
>   // try to open the file
>   Path schemaPath = new Path(uri);
>   FileSystem fs = schemaPath.getFileSystem(conf);
>   try (InputStream in = fs.open(schemaPath)) {
>     return parseSchema(uri, in);
>   }
> }
> {code}
> which is calling the HDFS Distributed file system.
> I honestly don't know why this call {{fs.open()}} is causing the BLOCKED thread... but it caused a lot of troubles.
> I'm not sure to see how this could be changed in a non-backward compatible way... I suggest to check if a scheme is detected and if not, to consider the processor as invalid. This could impact some users after an upgrade but we can document it in the upgrade guide if this approach is OK. If there is a better approach, let me know!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)