You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@drill.apache.org by "daniel kelly (Jira)" <ji...@apache.org> on 2019/09/25 13:24:00 UTC

[jira] [Commented] (DRILL-7290) “Failed to construct kafka consumer” using Apache Drill

    [ https://issues.apache.org/jira/browse/DRILL-7290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16937727#comment-16937727 ] 

daniel kelly commented on DRILL-7290:
-------------------------------------

 
{code:java}
This is because the kafka storage plugin never explicitly closes its connections to kafka. It relies on the idle timeout at the server ( connections.max.idle.ms ) which defaults to 10 minutes.

I modified the code locally under contrib/storage-kafka to close as appropriate, which solved the issue.

 git diff src/main/java/org/apache/drill/exec/store/kafka/MessageIterator.java

   @Override
@@ -110,4 +114,11 @@ public class MessageIterator implements Iterator<ConsumerRecord<byte[], byte[]>>
   public ConsumerRecord<byte[], byte[]> next() {
     return recordIter.next();
   }
+
+   we need to close the kafka connection to prevent connection resource leak
+  public void close()
+  {
+      // TODO should possibly use the specific timeout duration close signature
+      this.kafkaConsumer.close();
+  }
 }


git diff src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java

@@ -140,6 +140,7 @@ public class KafkaRecordReader extends AbstractRecordReader {
     logger.info("Total time to fetch messages from {}:{} is - {} milliseconds", subScanSpec.getTopicName(),
         subScanSpec.getPartitionId(), msgItr.getTotalFetchTime());
     messageReader.close();
+    msgItr.close();
   }
{code}
 

> “Failed to construct kafka consumer” using Apache Drill
> -------------------------------------------------------
>
>                 Key: DRILL-7290
>                 URL: https://issues.apache.org/jira/browse/DRILL-7290
>             Project: Apache Drill
>          Issue Type: Bug
>          Components: Functions - Drill
>    Affects Versions: 1.14.0, 1.16.0
>            Reporter: Aravind Voruganti
>            Priority: Blocker
>
> {noformat}
>  {noformat}
> I am using the Apache Drill (1.14) JDBC driver in my application which consumes the data from the Kafka. The application works just fine for some time and after few iterations it fails to execute due to the following *Too many files open* issue. I made sure there are no file handle leaks in my code but still nor sure why this issue is happening?
>  
> It looks like the issue is happening from with-in the Apache drill libraries when constructing the Kafka consumer. Can any one please guide me help this problem fixed?
> The problem perishes when I restart my Apache drillbit but very soon it happens again. I did check the file descriptor count on my unix machine using *{{ulimit -a | wc -l}} & {{lsof -a -p <PID> | wc -l}}* before and after the drill process restart and it seems the drill process is considerably taking a lot of file descriptors. I tried increasing the file descriptor count on the system but still no luck.
> I have followed the Apache Drill storage plugin documentation in configuring the Kafka plugin into Apache Drill at [https://drill.apache.org/docs/kafka-storage-plugin/]
> Any help on this issue is highly appreciated. Thanks.
> JDBC URL: *{{jdbc:drill:drillbit=localhost:31010;schema=kafka}}*
> NOTE: I am pushing down the filters in my query {{SELECT * FROM myKafkaTopic WHERE kafkaMsgTimestamp > 1560210931626}}
>  
> 2019-06-11 08:43:13,639 [230033ed-d410-ae7c-90cb-ac01d3b404cc:foreman] INFO o.a.d.e.store.kafka.KafkaGroupScan - User Error Occurred: Failed to fetch start/end offsets of the topic myKafkaTopic (Failed to construct kafka consumer)
>  org.apache.drill.common.exceptions.UserException: DATA_READ ERROR: Failed to fetch start/end offsets of the topic myKafkaTopic
> Failed to construct kafka consumer
> [Error Id: 73f896a7-09d4-425b-8cd5-f269c3a6e69a ]
>  at org.apache.drill.common.exceptions.UserException$Builder.build(UserException.java:633) ~[drill-common-1.14.0.jar:1.14.0]
>  at org.apache.drill.exec.store.kafka.KafkaGroupScan.init(KafkaGroupScan.java:198) [drill-storage-kafka-1.14.0.jar:1.14.0]
>  at org.apache.drill.exec.store.kafka.KafkaGroupScan.<init>(KafkaGroupScan.java:98) [drill-storage-kafka-1.14.0.jar:1.14.0]
>  at org.apache.drill.exec.store.kafka.KafkaStoragePlugin.getPhysicalScan(KafkaStoragePlugin.java:83) [drill-storage-kafka-1.14.0.jar:1.14.0]
>  at org.apache.drill.exec.store.AbstractStoragePlugin.getPhysicalScan(AbstractStoragePlugin.java:111) [drill-java-exec-1.14.0.jar:1.14.0]
>  at org.apache.drill.exec.planner.logical.DrillTable.getGroupScan(DrillTable.java:99) [drill-java-exec-1.14.0.jar:1.14.0]
>  at org.apache.drill.exec.planner.logical.DrillScanRel.<init>(DrillScanRel.java:89) [drill-java-exec-1.14.0.jar:1.14.0]
>  at org.apache.drill.exec.planner.logical.DrillScanRel.<init>(DrillScanRel.java:69) [drill-java-exec-1.14.0.jar:1.14.0]
>  at org.apache.drill.exec.planner.logical.DrillScanRel.<init>(DrillScanRel.java:62) [drill-java-exec-1.14.0.jar:1.14.0]
>  at org.apache.drill.exec.planner.logical.DrillScanRule.onMatch(DrillScanRule.java:38) [drill-java-exec-1.14.0.jar:1.14.0]
>  at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:212) [calcite-core-1.16.0-drill-r6.jar:1.16.0-drill-r6]
>  at org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:652) [calcite-core-1.16.0-drill-r6.jar:1.16.0-drill-r6]
>  at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:368) [calcite-core-1.16.0-drill-r6.jar:1.16.0-drill-r6]
>  at org.apache.drill.exec.planner.sql.handlers.DefaultSqlHandler.transform(DefaultSqlHandler.java:429) [drill-java-exec-1.14.0.jar:1.14.0]
>  at org.apache.drill.exec.planner.sql.handlers.DefaultSqlHandler.transform(DefaultSqlHandler.java:369) [drill-java-exec-1.14.0.jar:1.14.0]
>  at org.apache.drill.exec.planner.sql.handlers.DefaultSqlHandler.convertToRawDrel(DefaultSqlHandler.java:255) [drill-java-exec-1.14.0.jar:1.14.0]
>  at org.apache.drill.exec.planner.sql.handlers.DefaultSqlHandler.convertToDrel(DefaultSqlHandler.java:318) [drill-java-exec-1.14.0.jar:1.14.0]
>  at org.apache.drill.exec.planner.sql.handlers.DefaultSqlHandler.getPlan(DefaultSqlHandler.java:180) [drill-java-exec-1.14.0.jar:1.14.0]
>  at org.apache.drill.exec.planner.sql.DrillSqlWorker.getQueryPlan(DrillSqlWorker.java:145) [drill-java-exec-1.14.0.jar:1.14.0]
>  at org.apache.drill.exec.planner.sql.DrillSqlWorker.getPlan(DrillSqlWorker.java:83) [drill-java-exec-1.14.0.jar:1.14.0]
>  at org.apache.drill.exec.work.foreman.Foreman.runSQL(Foreman.java:567) [drill-java-exec-1.14.0.jar:1.14.0]
>  at org.apache.drill.exec.work.foreman.Foreman.run(Foreman.java:266) [drill-java-exec-1.14.0.jar:1.14.0]
>  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_181]
>  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_181]
>  at java.lang.Thread.run(Thread.java:748) [na:1.8.0_181]
>  Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
>  at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:765) ~[kafka-clients-0.11.0.1.jar:na]
>  at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:633) ~[kafka-clients-0.11.0.1.jar:na]
>  at org.apache.drill.exec.store.kafka.KafkaGroupScan.init(KafkaGroupScan.java:168) [drill-storage-kafka-1.14.0.jar:1.14.0]
>  ... 23 common frames omitted
>  Caused by: org.apache.kafka.common.KafkaException: java.io.IOException: Too many open files
>  at org.apache.kafka.common.network.Selector.<init>(Selector.java:129) ~[kafka-clients-0.11.0.1.jar:na]
>  at org.apache.kafka.common.network.Selector.<init>(Selector.java:156) ~[kafka-clients-0.11.0.1.jar:na]
>  at org.apache.kafka.common.network.Selector.<init>(Selector.java:160) ~[kafka-clients-0.11.0.1.jar:na]
>  at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:701) ~[kafka-clients-0.11.0.1.jar:na]
>  ... 25 common frames omitted
>  Caused by: java.io.IOException: Too many open files
>  at sun.nio.ch.EPollArrayWrapper.epollCreate(Native Method) ~[na:1.8.0_181]
>  at sun.nio.ch.EPollArrayWrapper.<init>(EPollArrayWrapper.java:130) ~[na:1.8.0_181]
>  at sun.nio.ch.EPollSelectorImpl.<init>(EPollSelectorImpl.java:69) ~[na:1.8.0_181]
>  at sun.nio.ch.EPollSelectorProvider.openSelector(EPollSelectorProvider.java:36) ~[na:1.8.0_181]
>  at java.nio.channels.Selector.open(Selector.java:227) ~[na:1.8.0_181]
>  at org.apache.kafka.common.network.Selector.<init>(Selector.java:127) ~[kafka-clients-0.11.0.1.jar:na]
> {code:java}
>  {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)