You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@bahir.apache.org by "nicole Wang (Jira)" <ji...@apache.org> on 2021/01/19 03:04:00 UTC

[jira] [Created] (BAHIR-258) Add NebulaGraph Connector for Flink

nicole Wang created BAHIR-258:
---------------------------------

             Summary: Add NebulaGraph Connector for Flink
                 Key: BAHIR-258
                 URL: https://issues.apache.org/jira/browse/BAHIR-258
             Project: Bahir
          Issue Type: New Feature
          Components: Flink Streaming Connectors
    Affects Versions: Flink-1.0, Flink-Next
            Reporter: nicole Wang
             Fix For: Flink-1.0


NebulaGraph([https://nebula-graph.io/]) is a graph database built for super large-scale graphs with milliseconds of latency. NebulaGraph is open source, distributed ,scalable and lightning fast.  NebulaGraph source code: [https://github.com/vesoft-inc] 

Graph database now is widely used in real-time recommendation, knowledge graph, financial risk control and other fields. And these scenes may use Flink to process data real-time.

In order to rich the data engine of Flink and to facilitate users to apply graph database in the real-time system, we propose to integrate NebulaGraph into Apache Flink.

We add source and sink to access NebulaGraph with Flink:

1. Source
{code:java}
// options to connect NebulaGraph
NebulaClientOptions nebulaClientOptions =
       new NebulaClientOptions.NebulaClientOptionsBuilder()
                .setMetaAddress("127.0.0.1:45500")
                .build();
// NebulaGraph connection provider
storageConnectionProvider =
        new NebulaStorageConnectionProvider(nebulaClientOptions);
// options for NebulaGraph Source
ExecutionOptions vertexExecutionOptions = 
        new VertexExecutionOptions.ExecutionOptionBuilder()
        .setGraphSpace("flinkSource")
        .setTag("player")
        .setNoColumn(false)
        .setFields(Arrays.asList("name","age"))
        .setLimit(100)
        .builder();

StreamExecutionEnvironment env =            
        StreamExecutionEnvironment.getExecutionEnvironment();

NebulaSourceFunction sourceFunction = 
        new NebulaSourceFunction(storageConnectionProvider)
        .setExecutionOptions(vertexExecutionOptions);
DataStreamSource<BaseTableRow> dataStreamSource = env.addSource(sourceFunction);

{code}
2. Sink
{code:java}
// options to connect NebulaGraph
NebulaClientOptions nebulaClientOptions =
       new NebulaClientOptions.NebulaClientOptionsBuilder()
                .setMetaAddress("127.0.0.1:45500")
                .build();
// NebulaGraph connection provider
NebulaGraphConnectionProvider graphConnectionProvider =
        new NebulaGraphConnectionProvider(nebulaClientOptions);
NebulaMetaConnectionProvider metaConnectionProvider =
        new NebulaMetaConnectionProvider(nebulaClientOptions);

// options for NebulaGraph Sink
ExecutionOptions executionOptions = 
        new VertexExecutionOptions.ExecutionOptionBuilder()
        .setGraphSpace("flinkSink")
        .setTag("player")
        .setIdIndex(0)
        .setFields(Arrays.asList("name", "age"))
        .setPositions(Arrays.asList(1, 2))
        .setBatch(2)
        .builder();

NebulaBatchOutputFormat outPutFormat =
      new NebulaBatchOutputFormat(graphConnectionProvider, metaConnectionProvider)
      .setExecutionOptions(executionOptions);

NebulaSinkFunction nebulaSinkFunction = new NebulaSinkFunction(outPutFormat);
dataSource.addSink(nebulaSinkFunction);{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)