You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@doris.apache.org by B哥 <wy...@126.com> on 2019/07/22 07:39:47 UTC

Expose data pruned-filter-scan ability for Integrating Spark and Doris

Hi, all Doris Commiter


Recently I will create a pull request for Expose data pruned-filter-scan ability for Integrating Spark and Doris.  Please propose this proposal, thanks.


The First Step is Expose data pruned-filter-scan ability, then implement the Spark-Doris-Connector on Spark side. below is the tremendous detailed design.


Outline

Doris for Apache Spark (Spark-Doris-Connector) allowing Spark tasks to use it as a library and interact with Doris through Doris for Apache Spark APIs

Background

Recently years, Machine Learning intensively are integrated to computing system such as Spark MLib for Spark etc. if Doris expose the data for Computing System such as Spark, then we can make full use of the Computing System's MLib gains a competitive edge with state-of-the art Machine Learning on large data sets, collaborate, and productionize models at massive scale

When Doris processing large query, if memory exceed the memory limitation, this query would fail, but Spark can write the intermediate result back to Disk gracefully, the Integration for Spark and Doris not only resolved this problem, but also Spark can take full advantage of the scan performance of the column-stride of Doris Storage Engine which can provide extra push-down filter ability save a lot of network IO .

Nowadays, enterprise store lots data on different Storage Service such as Mysql、Elasticsearch、Doris、HDFS、NFS、Table etc. They need a way to analyze all this data with conjunctive query,Spark already get through with almost these Service except Doris. If build the bridge for Spark and Doris, Spark can provided unified data manipulation platform for this scenario

How to realize

Doris FE is responsible for pruning the related tablet, decide which predicate can used to filter data, providing single node query plan fragment, then packed all those things and return client without scheduling these to backend instance in contrast to before. In this way, FE no longer would coordinate the query lifecycle

Doris BE is responsible for assembling all parameters from client, such as tabletIds、version、encoded plan fragment etc which mostly generated by Doris FE , and then execute this plan fragment instance, all the row results return by this plan fragment would be pushed to a attached blocking queue firstly, when client call get_next to iterate the result, fetch this batched result from blocking queue and answer the client.

Architecture
https://github.com/wuyunfeng/incubator-doris/blob/docs/docs/resources/images/spark_doris_connector.jpg
WorkFlow

Spark ApplicationMaster fetch the Table schema from Doris FE

Spark ApplicationMaster get the Query Plan From Doris FE.

Spark ApplicationMaster transfer the Query Plan to all relevant Executor Worker

Spark Executor Worker open context from Doris BE with the generated QueryPlan

Spark Executor Worker iterate all data from Doris BE by get_next

Spark Executor Worker close context from Doris BE

Additional API
Doris FE HTTP Transport Protocol
GET Table Schema

Request:

GET /{cluster}/{database}/{table}/_schema


Response:

{
    "status": 200,
    "properties":{
        "k1":{
            "type":"SMALLINT",
            "comment":"this is a small SMALLINT column"
        },
        "k2":{
            "type":"LARGEINT",
            "comment":"this is a small LARGEINT column"
        },
        "k3":{
            "type":"FLOAT",
            "comment":"this is a small FLOAT column"
        },
        "k4":{
            "type":"CHAR",
            "comment":"this is a small CHAR column"
        }
    }
}

GET Query Plan

Request:

POST /{cluster}/{database}/{table}/_query_plan
{
  "sql": "select k2, k4 from table where k1 > 2 and k3 > 4"
}


Response:

{
    "status": 200,
    "encrypted_plan_desc":"This is a base64 encrypted doris FE query plan",
    "partitions":{
        "{tabletId}":[
            {
                "host":"192.168.0.1",
                "port":"9301"
            },
            {
                "host":"192.168.0.2",
                "port":"9301"
            },
            {
                "host":"192.168.0.3",
                "port":"9301"
            }
        ],
        "139":[
            {
                "host":"192.168.0.3",
                "port":"9301"
            },
            {
                "host":"192.168.0.4",
                "port":"9301"
            },
            {
                "host":"192.168.0.6",
                "port":"9301"
            }
        ],
        "140":[
            {
                "host":"192.168.0.1",
                "port":"9301"
            },
            {
                "host":"192.168.0.2",
                "port":"9301"
            },
            {
                "host":"192.168.0.3",
                "port":"9301"
            }
        ]
    }
}

Doris BE Thrift Transport Protocol

External Service:

// scan service expose ability of scanning data ability to other compute system
service **TDorisExternalService** {
    // doris will build  a scan context for this session, context_id returned if success
    TScanOpenResult open(1: TScanOpenParams params);

    // return the batch_size of data
    TScanBatchResult getNext(1: TScanNextBatchParams params);

    // release the context resource associated with the context_id
    TScanCloseResult close(1: TScanCloseParams params);
}


Message:

struct TScanColumnDesc {
  // The column name
  1: optional string name
  // The column type. Always set.
  2: optional Types.TPrimitiveType type
}

struct TScanColumnData {
  // One element in the list for every row in the column indicating if there is
  // a value in the vals list or a null.
  1: required list<bool> is_null;

  // Only one is set, only non-null values are set. this indicates one column data for a row batch
  2: optional list<bool> bool_vals;
  3: optional list<byte> byte_vals;
  4: optional list<i16> short_vals;
  5: optional list<i32> int_vals;
  // for date and long value
  6: optional list<i64> long_vals;
  // for float and double
  7: optional list<double> double_vals;
  // for char, varchar, decimal
  8: optional list<string> string_vals;
  9: optional list<binary> binary_vals;
}

// Serialized batch of rows returned by getNext().
// one row batch contains mult rows, and the result is arranged in column style
struct TScanRowBatch {
  // selected_columns + cols = data frame
  // Each TScanColumnData contains the data for an entire column. Always set.
  1: optional list<TScanColumnData> cols
  // The number of rows returned.
  2: optional i32 num_rows
}

struct TTabletVersionInfo {
  1: required i64 tablet_id
  2: required i64 version
  3: required i64 version_hash
  // i32 for historical reason
  4: required i32 schema_hash
}

struct TQueryPlanInfo {
  1: required Planner.TPlanFragment plan_fragment
  // tablet_id -> TTabletVersionInfo
  2: required map<i64, TTabletVersionInfo> tablet_info
  3: required Descriptors.TDescriptorTable desc_tbl
  // all tablet scan should share one query_id
  4: required Types.TUniqueId query_id
}


// Parameters to open().
struct TScanOpenParams {

  1: required string cluster

  2: required string database

  3: required string table

  // tablets to scan
  4: required list<i64> tablet_ids

  // base64 encoded binary plan fragment
  5: required string opaqued_query_plan

  // A string specified for the table that is passed to the external data source.
  // Always set, may be an empty string.
  6: optional i32 batch_size

  // reserved params for use
  7: optional map<string,string> properties

  // The query limit, if specified.
  8: optional i64 limit

  // The authenticated user name. Always set.
  // maybe usefullless
  9: optional string user

  10: optional string passwd
}


// Returned by open().
struct TScanOpenResult {
  1: required Status.TStatus status
  // An opaque context_id used in subsequent getNext()/close() calls. Required.
  2: optional string context_id
  // selected fields
  3: optional list<TScanColumnDesc> selected_columns

}

// Parameters to getNext()
struct TScanNextBatchParams {
  // The opaque handle returned by the previous open() call. Always set.
  1: optional string context_id    // doris olap engine context id
  2: optional i64 offset            // doris should check the offset to prevent duplicate rpc calls
}

// Returned by getNext().
struct TScanBatchResult {
  1: required Status.TStatus status

  // If true, reached the end of the result stream; subsequent calls to
  // getNext() won’t return any more results. Required.
  2: optional bool eos

  // A batch of rows to return, if any exist. The number of rows in the batch
  // should be less than or equal to the batch_size specified in TOpenParams.
  3: optional TScanRowBatch rows
}

// Parameters to close()
struct TScanCloseParams {
  // The opaque handle returned by the previous open() call. Always set.
  1: optional string context_id
}

// Returned by close().
struct TScanCloseResult {
  1: required Status.TStatus status
}


Best Regards.


--

------------------------------------------
Yun 
email:wyfsky888@126.com