You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Pankaj (JIRA)" <ji...@apache.org> on 2018/03/19 12:49:00 UTC
[jira] [Reopened] (FLINK-9009) Error| You are creating too many
HashedWheelTimer instances. HashedWheelTimer is a shared resource that
must be reused across the application, so that only a few instances are
created.
[ https://issues.apache.org/jira/browse/FLINK-9009?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Pankaj reopened FLINK-9009:
---------------------------
Above Issue only comes when we use CassandraSink of flink.
If we use flink parallelism =10 and try to write stream events from kafka topic to cassandra, system throws error:
*You are creating too many HashedWheelTimer instances. HashedWheelTimer is a shared resource that must be reused across the application, so that only a few instances are created-> Also reason for OOM of JVM*
For your references: It seems Flink CassandraSink is not closing cassandra session when we increase parallelism.
One can easily replicate the issue :
step1: CPU core 200 multicore
step2: Kafkatopic
step3: parallelism=10
step4: memory 512MB
Classes : CassandraSink
Snippet:
public class App {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(20);
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");
FlinkKafkaConsumer011<String> myConsumer = new FlinkKafkaConsumer011<>("test", new SimpleStringSchema(),
properties);
DataStream<AtomicEvent> stream = env.addSource(myConsumer).map(new MapFunction<String, AtomicEvent>() {
private static final long serialVersionUID = 1L;
@Override
public AtomicEvent map(String value) throws Exception {
return new AtomicEvent("xyx", new Date(), "1", ByteBuffer.wrap("a".getBytes()), "1");
}
});
CassandraSink.addSink(stream).setClusterBuilder(new ClusterBuilder() {
private static final long serialVersionUID = 1L;
@Override
public Cluster buildCluster(Cluster.Builder builder) {
return builder.addContactPoint("127.0.0.1").build();
}
}).build();
env.execute();
}
}
Mar 19, 2018 6:16:19 PM com.datastax.driver.core.NettyUtil <clinit>
INFO: Did not find Netty's native epoll transport in the classpath, defaulting to NIO.
Mar 19, 2018 6:16:19 PM io.netty.util.internal.logging.Slf4JLogger error
SEVERE: LEAK: You are creating too many HashedWheelTimer instances. HashedWheelTimer is a shared resource that must be reused across the JVM,so that only a few instances are created.
Seems some issue at flink CassandraSinkBase implementation
> Error| You are creating too many HashedWheelTimer instances. HashedWheelTimer is a shared resource that must be reused across the application, so that only a few instances are created.
> -----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-9009
> URL: https://issues.apache.org/jira/browse/FLINK-9009
> Project: Flink
> Issue Type: Bug
> Environment: Pass platform: Openshit
> Reporter: Pankaj
> Priority: Major
>
> Steps to reproduce:
> 1- Flink with Kafka as a consumer -> Writing stream to Cassandra using flink cassandra sink.
> 2- In memory Job manager and task manager with checkpointing 5000ms.
> 3- env.setpararllelism(10)-> As kafka topic has 10 partition.
> 4- There are around 13 unique streams in a single flink run time environment which are reading from kafka -> processing and writing to cassandra.
> Hardware: CPU 200 milli core . It is deployed on Paas platform on one node
> Memory: 526 MB.
>
> When i start the server, It starts flink and all off sudden stops with above error. It also shows out of memory error.
>
> It would be nice if any body can suggest if something is wrong.
>
> Maven:
> flink-connector-cassandra_2.11: 1.3.2
> flink-streaming-java_2.11: 1.4.0
> flink-connector-kafka-0.11_2.11:1.4.0
>
>
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)