You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@inlong.apache.org by "luchunliang (via GitHub)" <gi...@apache.org> on 2024/04/20 09:54:12 UTC

[I] [Feature] Agent on InLong Transform [inlong]

luchunliang opened a new issue, #10023:
URL: https://github.com/apache/inlong/issues/10023

   ### Description
   
   ## Motivation
    - Agent (File collection) needs the ability to filter and collect valid data content.
    - Agent (Pulsar collection) requires PB protocol parsing and data extraction capabilities.
   
   ## Solution
    - Transform is integrated as an SDK by Agent; Manager will also integrate Transform to provide pre-transformation validation when users configure transformation SQL.
    - Before performing transformation processing, the Agent needs to register the transformation configuration pulled from Manager to Transform. When the transformation configuration changes, it needs to re-register the configuration to Transform based on Key: StreamSourceId.
   - Agent-Sink passes StreamSourceId and RawData into Transform, and Transform returns zero or more FormalData. Agent-Sink sends the final FormalData to DataProxy.
    - For Transform's registered configurations, there is one set of configurations per StreamSourceId, and one StreamSourceId belongs to one GroupId and StreamId's InLong data stream.
   - Transform's transformation configuration includes three parts: transformation Source, transformation SQL, and transformation Sink.
    - Transformation SQL first provides basic field filtering and field cropping. Other date and time conversion functions and string conversion functions will be supplemented later based on Flink's built-in functions.
   ![image](https://github.com/apache/inlong/assets/8925507/32800fbb-bb90-4c6c-853d-f9f511b6815b)
   
   ## Configuration Model
   ![image](https://github.com/apache/inlong/assets/8925507/40edb2ef-633b-481b-b22c-a5f4b99805a9)
   
   ## Interface API of Transform SDK
   - TransformConfig register(String streamSourceId, TransformConfig config) throws TransformException;
    - If the transformation SQL compilation fails, an exception is returned.
    - Check the legality and non-null of the configuration, and return an exception if it fails.
    - ProtoDefine automatically generates code and compiles it using a dynamic classloader; if it fails, an exception is returned.
    - If the configuration validation is successful, the previous TransformConfig is returned; if it is a new configuration, null is returned.
   - TransformConfig unregister(String streamSourceId);
    - Unregister the configuration, return the previous TransformConfig; if it does not exist, return null.
   - List<byte[]> transform(String streamSourceId, byte[] rawdata) throws TransformException;
    - Synchronous interface
    - Thread-safe
    - Processing logic:
     - Based on the SourceInfo configuration, parse rawdata, and generate Map<String, byte[]> or ProtoObject/JsonObject.
     - Interpret the syntax tree generated by the transformation SQL and generate the result set List<Row>.
     - Based on the SinkInfo configuration, convert the result set List<Row> into List<byte[]> and return it.
   
   ### Use case
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [X] Yes, I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@inlong.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org