You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Abhijeet Kumar <ab...@sentienz.com> on 2018/11/13 08:06:01 UTC

Flink Streaming sink to InfluxDB

Hello Team,

I'm new to Flink and writing a Flink job that will take data from Kafka and sink it to InfluxDB. I tried using the concept this guy is using
 
https://github.com/apache/bahir-flink/blob/master/flink-connector-influxdb/examples/src/main/java/org/apache/flink/streaming/examples/influxdb/InfluxDBSinkExample.java

package com.dataartisans;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.influxdb.InfluxDBConfig;
import org.apache.flink.streaming.connectors.influxdb.InfluxDBPoint;
import org.apache.flink.streaming.connectors.influxdb.InfluxDBSink;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.json.JSONObject;

import java.util.*;
import java.util.concurrent.TimeUnit;

public class ReadFromKafka {

	public static void main(String[] args) throws Exception {

		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		ParameterTool parameterTool = ParameterTool.fromArgs(args);
		
		DataStream<String> messageStream = env.addSource(new FlinkKafkaConsumer082<>(parameterTool.getRequired("topic"), new SimpleStringSchema(), parameterTool.getProperties()));
		
		DataStream<InfluxDBPoint> formatStream = messageStream.rebalance().map(new MapFunction<String, InfluxDBPoint>() {
			private static final long serialVersionUID = -6867736771747690202L;

			@Override
			public InfluxDBPoint map(String value) throws Exception {
				JSONObject jsonObj = new JSONObject(value);
				
				HashMap<String, String> tags = new HashMap<>();
				tags.put("source","kafka");
				tags.put("sink","InfluxDB");
				
				HashMap<String, Object> fields = new HashMap<>();
                fields.put("first_name", jsonObj.getString("first_name"));
                fields.put("last_name", jsonObj.getString("last_name"));
                
				return new InfluxDBPoint("influxConnect", System.currentTimeMillis(),tags, fields);
			}
		});
		
		 InfluxDBConfig influxDBConfig = InfluxDBConfig.builder("http://localhost:8086", "root", "root", "db_flink_test")
	                .batchActions(1000)
	                .flushDuration(100, TimeUnit.MILLISECONDS)
	                .enableGzip(true)
	                .build();
		
		formatStream.addSink(new InfluxDBSink(influxDBConfig));

		env.execute("InfluxDB Sink Example");
	}
}

this is throwing error:

12:41:51,364 INFO  org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer  - Trying to get topic metadata from broker localhost:9092 in try 0/3
12:41:51,876 INFO  org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer  - Topic flinkkafka has 1 partitions
12:41:51,928 INFO  org.apache.flink.api.java.typeutils.TypeExtractor             - class org.apache.flink.streaming.connectors.influxdb.InfluxDBPoint must have a default constructor to be used as a POJO.
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/flink/util/Preconditions
	at org.apache.flink.streaming.connectors.influxdb.InfluxDBConfig.<init>(InfluxDBConfig.java:44)
	at org.apache.flink.streaming.connectors.influxdb.InfluxDBConfig$Builder.build(InfluxDBConfig.java:221)
	at com.dataartisans.ReadFromKafka.main(ReadFromKafka.java:67)
Caused by: java.lang.ClassNotFoundException: org.apache.flink.util.Preconditions
	at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:338)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
	... 3 more

Can someone please help me to get out of this problem.

Thanks,

	
Abhijeet Kumar
Software Development Engineer,
Sentienz Solutions Pvt Ltd
Cognitive Data Platform - Perceive the Data !
abhijeet.kumar@sentienz.com <ma...@sentienz.com> |www.sentienz.com <http://www.sentienz.com/> | Bengaluru



Re: Flink Streaming sink to InfluxDB

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Hi,

This is most likely an exception that indicates either that 1) you are
using mismatching versions of Flink in your application code and the
installed Flink cluster, or 2) your application code isn't properly
packaged.
From your exception, I'm guessing it is the latter case. If so, I would
suggest taking a look at the POM of the Flink quickstart project to get an
idea of how to package your Flink applications properly [1].

Cheers,
Gordon

[1] https://ci.apache.org/projects/flink/flink
-docs-release-1.6/quickstart/java_api_quickstart.html

On Tue, Nov 13, 2018 at 4:06 PM Abhijeet Kumar <ab...@sentienz.com>
wrote:

> Hello Team,
>
> I'm new to Flink and writing a Flink job that will take data from Kafka
> and sink it to InfluxDB. I tried using the concept this guy is using
>
>
> https://github.com/apache/bahir-flink/blob/master/flink-connector-influxdb/examples/src/main/java/org/apache/flink/streaming/examples/influxdb/InfluxDBSinkExample.java
>
> package com.dataartisans;
>
> import org.apache.flink.api.common.functions.MapFunction;
> import org.apache.flink.api.java.utils.ParameterTool;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.connectors.influxdb.InfluxDBConfig;
> import org.apache.flink.streaming.connectors.influxdb.InfluxDBPoint;
> import org.apache.flink.streaming.connectors.influxdb.InfluxDBSink;
> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082;
> import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
> import org.json.JSONObject;
>
> import java.util.*;
> import java.util.concurrent.TimeUnit;
>
> public class ReadFromKafka {
>
> public static void main(String[] args) throws Exception {
>
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>
> ParameterTool parameterTool = ParameterTool.fromArgs(args);
> DataStream<String> messageStream = env.addSource(new
> FlinkKafkaConsumer082<>(parameterTool.getRequired("topic"), new
> SimpleStringSchema(), parameterTool.getProperties()));
> DataStream<InfluxDBPoint> formatStream = messageStream.rebalance().map(new
> MapFunction<String, InfluxDBPoint>() {
> private static final long serialVersionUID = -6867736771747690202L;
>
> @Override
> public InfluxDBPoint map(String value) throws Exception {
> JSONObject jsonObj = new JSONObject(value);
> HashMap<String, String> tags = new HashMap<>();
> tags.put("source","kafka");
> tags.put("sink","InfluxDB");
> HashMap<String, Object> fields = new HashMap<>();
>                 fields.put("first_name", jsonObj.getString("first_name"));
>                 fields.put("last_name", jsonObj.getString("last_name"));
>
> return new InfluxDBPoint("influxConnect", System.currentTimeMillis(),tags,
> fields);
> }
> });
> InfluxDBConfig influxDBConfig = InfluxDBConfig.builder("
> http://localhost:8086", "root", "root", "db_flink_test")
>                .batchActions(1000)
>                .flushDuration(100, TimeUnit.MILLISECONDS)
>                .enableGzip(true)
>                .build();
> formatStream.addSink(new InfluxDBSink(influxDBConfig));
>
> env.execute("InfluxDB Sink Example");
> }
> }
>
> this is throwing error:
>
> 12:41:51,364 INFO
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer  - Trying to
> get topic metadata from broker localhost:9092 in try 0/3
> 12:41:51,876 INFO
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer  - Topic
> flinkkafka has 1 partitions
> 12:41:51,928 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
>       - class org.apache.flink.streaming.connectors.influxdb.InfluxDBPoint
> must have a default constructor to be used as a POJO.
> Exception in thread "main" java.lang.NoClassDefFoundError:
> org/apache/flink/util/Preconditions
> at org.apache.flink.streaming.connectors.influxdb.InfluxDBConfig.<init>(
> InfluxDBConfig.java:44)
> at
> org.apache.flink.streaming.connectors.influxdb.InfluxDBConfig$Builder.build(
> InfluxDBConfig.java:221)
> at com.dataartisans.ReadFromKafka.main(ReadFromKafka.java:67)
> Caused by: java.lang.ClassNotFoundException:
> org.apache.flink.util.Preconditions
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:338)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> ... 3 more
>
> Can someone please help me to get out of this problem.
>
> Thanks,
>
>
> *Abhijeet Kumar*
> Software Development Engineer,
> Sentienz Solutions Pvt Ltd
> Cognitive Data Platform - Perceive the Data !
> abhijeet.kumar@sentienz.com |www.sentienz.com | Bengaluru
>
>
>