You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@bahir.apache.org by "nicole Wang (Jira)" <ji...@apache.org> on 2021/01/19 03:04:00 UTC
[jira] [Created] (BAHIR-258) Add NebulaGraph Connector for Flink
nicole Wang created BAHIR-258:
---------------------------------
Summary: Add NebulaGraph Connector for Flink
Key: BAHIR-258
URL: https://issues.apache.org/jira/browse/BAHIR-258
Project: Bahir
Issue Type: New Feature
Components: Flink Streaming Connectors
Affects Versions: Flink-1.0, Flink-Next
Reporter: nicole Wang
Fix For: Flink-1.0
NebulaGraph([https://nebula-graph.io/]) is a graph database built for super large-scale graphs with milliseconds of latency. NebulaGraph is open source, distributed ,scalable and lightning fast. NebulaGraph source code: [https://github.com/vesoft-inc]
Graph database now is widely used in real-time recommendation, knowledge graph, financial risk control and other fields. And these scenes may use Flink to process data real-time.
In order to rich the data engine of Flink and to facilitate users to apply graph database in the real-time system, we propose to integrate NebulaGraph into Apache Flink.
We add source and sink to access NebulaGraph with Flink:
1. Source
{code:java}
// options to connect NebulaGraph
NebulaClientOptions nebulaClientOptions =
new NebulaClientOptions.NebulaClientOptionsBuilder()
.setMetaAddress("127.0.0.1:45500")
.build();
// NebulaGraph connection provider
storageConnectionProvider =
new NebulaStorageConnectionProvider(nebulaClientOptions);
// options for NebulaGraph Source
ExecutionOptions vertexExecutionOptions =
new VertexExecutionOptions.ExecutionOptionBuilder()
.setGraphSpace("flinkSource")
.setTag("player")
.setNoColumn(false)
.setFields(Arrays.asList("name","age"))
.setLimit(100)
.builder();
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
NebulaSourceFunction sourceFunction =
new NebulaSourceFunction(storageConnectionProvider)
.setExecutionOptions(vertexExecutionOptions);
DataStreamSource<BaseTableRow> dataStreamSource = env.addSource(sourceFunction);
{code}
2. Sink
{code:java}
// options to connect NebulaGraph
NebulaClientOptions nebulaClientOptions =
new NebulaClientOptions.NebulaClientOptionsBuilder()
.setMetaAddress("127.0.0.1:45500")
.build();
// NebulaGraph connection provider
NebulaGraphConnectionProvider graphConnectionProvider =
new NebulaGraphConnectionProvider(nebulaClientOptions);
NebulaMetaConnectionProvider metaConnectionProvider =
new NebulaMetaConnectionProvider(nebulaClientOptions);
// options for NebulaGraph Sink
ExecutionOptions executionOptions =
new VertexExecutionOptions.ExecutionOptionBuilder()
.setGraphSpace("flinkSink")
.setTag("player")
.setIdIndex(0)
.setFields(Arrays.asList("name", "age"))
.setPositions(Arrays.asList(1, 2))
.setBatch(2)
.builder();
NebulaBatchOutputFormat outPutFormat =
new NebulaBatchOutputFormat(graphConnectionProvider, metaConnectionProvider)
.setExecutionOptions(executionOptions);
NebulaSinkFunction nebulaSinkFunction = new NebulaSinkFunction(outPutFormat);
dataSource.addSink(nebulaSinkFunction);{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)