You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2020/05/15 06:20:01 UTC
[carbondata] branch master updated: [CARBONDATA-3804] Provide
end-to-end flink integration guide
This is an automated email from the ASF dual-hosted git repository.
kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new bcc2953 [CARBONDATA-3804] Provide end-to-end flink integration guide
bcc2953 is described below
commit bcc2953e9fb99e5de767eeb0a7f6cbc5f6adf695
Author: liuzhi <37...@qq.com>
AuthorDate: Thu May 7 11:59:09 2020 +0800
[CARBONDATA-3804] Provide end-to-end flink integration guide
Why is this PR needed?
Provide an end-to-end guide to help user understand and use flink integration module.
What changes were proposed in this PR?
Add file docs/flink-integration-guide.md
This closes #3752
---
README.md | 1 +
docs/flink-integration-guide.md | 225 ++++++++++++++++++++++++++++++++++++++++
2 files changed, 226 insertions(+)
diff --git a/README.md b/README.md
index 5cd27b5..d07959e 100644
--- a/README.md
+++ b/README.md
@@ -72,6 +72,7 @@ CarbonData is built using Apache Maven, to [build CarbonData](https://github.com
* [Hive](https://github.com/apache/carbondata/blob/master/docs/hive-guide.md)
* [Presto](https://github.com/apache/carbondata/blob/master/docs/prestodb-guide.md)
* [Alluxio](https://github.com/apache/carbondata/blob/master/docs/alluxio-guide.md)
+* [Flink](https://github.com/apache/carbondata/blob/master/docs/flink-integration-guide.md)
## Other Technical Material
* [Apache CarbonData meetup material](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=66850609)
diff --git a/docs/flink-integration-guide.md b/docs/flink-integration-guide.md
new file mode 100644
index 0000000..0a11d6c
--- /dev/null
+++ b/docs/flink-integration-guide.md
@@ -0,0 +1,225 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to you under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+```
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+```
+
+-->
+
+# Carbon Flink Integration Guide
+
+## Usage scenarios
+
+ The CarbonData flink integration module is used to connect Flink and Carbon. The module provides
+ a set of Flink BulkWriter implementations (CarbonLocalWriter and CarbonS3Writer). The data is processed
+ by the Flink, and finally written into the stage directory of the target table by the CarbonXXXWriter.
+
+ By default, those data in table stage directory, can not be immediately queried, those data can be queried
+ after the `INSERT INTO $tableName STAGE` command is executed.
+
+ Since the flink data written to carbon is endless, in order to ensure the visibility of data
+ and the controllable amount of data processed during the execution of each insert form stage command,
+ the user should execute the insert from stage command in a timely manner.
+
+ The execution interval of the insert form stage command should take the data visibility requirements
+ of the actual business and the flink data traffic. When the data visibility requirements are high
+ or the data traffic is large, the execution interval should be appropriately shortened.
+
+ A typical scenario is that the data is cleaned and preprocessed by Flink, and then written to Carbon,
+ for subsequent analysis and queries.
+
+## Usage description
+
+### Writing process
+
+ Typical flink stream: `Source -> Process -> Output(Carbon Writer Sink)`
+
+ Pseudo code and description:
+
+ ```scala
+ // Import dependencies.
+ import java.util.Properties
+ import org.apache.carbon.flink.CarbonWriterFactory
+ import org.apache.carbon.flink.ProxyFileSystem
+ import org.apache.carbondata.core.constants.CarbonCommonConstants
+ import org.apache.flink.api.common.restartstrategy.RestartStrategies
+ import org.apache.flink.core.fs.Path
+ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+ import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
+
+ // Specify database name.
+ val databaseName = "default"
+
+ // Specify target table name.
+ val tableName = "test"
+ // Table path of the target table.
+ val tablePath = "/data/warehouse/test"
+ // Specify local temporary path.
+ val dataTempPath = "/data/temp/"
+
+ val tableProperties = new Properties
+ // Set the table properties here.
+
+ val writerProperties = new Properties
+ // Set the writer properties here, such as temp path, commit threshold, access key, secret key, endpoint, etc.
+
+ val carbonProperties = new Properties
+ // Set the carbon properties here, such as date format, store location, etc.
+
+ // Create carbon bulk writer factory. Two writer types are supported: 'Local' and 'S3'.
+ val writerFactory = CarbonWriterFactory.builder("Local").build(
+ databaseName,
+ tableName,
+ tablePath,
+ tableProperties,
+ writerProperties,
+ carbonProperties
+ )
+
+ // Build a flink stream and run it.
+ // 1. Create a new flink execution environment.
+ val environment = StreamExecutionEnvironment.getExecutionEnvironment
+ // Set flink environment configuration here, such as parallelism, checkpointing, restart strategy, etc.
+
+ // 2. Create flink data source, may be a kafka source, custom source, or others.
+ // The data type of source should be Array[AnyRef].
+ // Array length should equals to table column count, and values order in array should matches table column order.
+ val source = ...
+ // 3. Create flink stream and set source.
+ val stream = environment.addSource(source)
+ // 4. Add other flink operators here.
+ // ...
+ // 5. Set flink stream target (write data to carbon with a write sink).
+ stream.addSink(StreamingFileSink.forBulkFormat(new Path(ProxyFileSystem.DEFAULT_URI), writerFactory).build)
+ // 6. Run flink stream.
+ try {
+ environment.execute
+ } catch {
+ case exception: Exception =>
+ // Handle execute exception here.
+ }
+ ```
+
+### Writer properties
+
+#### Local Writer
+
+ | Property | Name | Description |
+ |--------------------------------------|--------------------------------------|---------------------------------------------------------------------------------------------------------|
+ | CarbonLocalProperty.DATA_TEMP_PATH | carbon.writer.local.data.temp.path | Usually is a local path, data will write to temp path first, and move to target data path finally. |
+ | CarbonLocalProperty.COMMIT_THRESHOLD | carbon.writer.local.commit.threshold | While written data count reach the threshold, data writer will flush and move data to target data path. |
+
+#### S3 Writer
+
+ | Property | Name | Description |
+ |-----------------------------------|-----------------------------------|---------------------------------------------------------------------------------------------------------|
+ | CarbonS3Property.ACCESS_KEY | carbon.writer.s3.access.key | Access key of s3 file system |
+ | CarbonS3Property.SECRET_KEY | carbon.writer.s3.secret.key | Secret key of s3 file system |
+ | CarbonS3Property.ENDPOINT | carbon.writer.s3.endpoint | Endpoint of s3 file system |
+ | CarbonS3Property.DATA_TEMP_PATH | carbon.writer.s3.data.temp.path | Usually is a local path, data will write to temp path first, and move to target data path finally. |
+ | CarbonS3Property.COMMIT_THRESHOLD | carbon.writer.s3.commit.threshold | While written data count reach the threshold, data writer will flush and move data to target data path. |
+
+### Insert from stage
+
+ Refer [Grammar Description](./dml-of-carbondata.md#insert-data-into-carbondata-table-from-stage-input-files) for syntax.
+
+## Usage Example Code
+
+ Create target table.
+
+ ```sql
+ CREATE TABLE test (col1 string, col2 string, col3 string) STORED AS carbondata
+ ```
+
+ Writing flink data to local carbon table.
+
+ ```scala
+ import java.util.Properties
+ import org.apache.carbon.flink.CarbonLocalProperty
+ import org.apache.carbon.flink.CarbonWriterFactory
+ import org.apache.carbon.flink.ProxyFileSystem
+ import org.apache.carbondata.core.constants.CarbonCommonConstants
+ import org.apache.flink.api.common.restartstrategy.RestartStrategies
+ import org.apache.flink.core.fs.Path
+ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+ import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
+ import org.apache.flink.streaming.api.functions.source.SourceFunction
+
+ val databaseName = "default"
+ val tableName = "test"
+ val tablePath = "/data/warehouse/test"
+ val dataTempPath = "/data/temp/"
+
+ val tableProperties = new Properties
+
+ val writerProperties = new Properties
+ writerProperties.setProperty(CarbonLocalProperty.DATA_TEMP_PATH, dataTempPath)
+
+ val carbonProperties = new Properties
+ carbonProperties.setProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
+ carbonProperties.setProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)
+ carbonProperties.setProperty(CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB, "1024")
+
+ val writerFactory = CarbonWriterFactory.builder("Local").build(
+ databaseName,
+ tableName,
+ tablePath,
+ tableProperties,
+ writerProperties,
+ carbonProperties
+ )
+
+ val environment = StreamExecutionEnvironment.getExecutionEnvironment
+ environment.setParallelism(1)
+ environment.enableCheckpointing(2000L)
+ environment.setRestartStrategy(RestartStrategies.noRestart)
+
+ // Define a custom source.
+ val source = new SourceFunction[Array[AnyRef]]() {
+ override
+ def run(sourceContext: SourceFunction.SourceContext[Array[AnyRef]]): Unit = {
+ // Array length should equals to table column count, and values order in array should matches table column order.
+ val data = new Array[AnyRef](3)
+ data(0) = "value1"
+ data(1) = "value2"
+ data(2) = "value3"
+ sourceContext.collect(data)
+ }
+
+ override
+ def cancel(): Unit = {
+ // do something.
+ }
+ }
+
+ val stream = environment.addSource(source)
+ val streamSink = StreamingFileSink.forBulkFormat(new Path(ProxyFileSystem.DEFAULT_URI), writerFactory).build
+
+ stream.addSink(streamSink)
+
+ try {
+ environment.execute
+ } catch {
+ case exception: Exception =>
+ // TODO
+ throw new UnsupportedOperationException(exception)
+ }
+ ```
+
+ Insert into table from stage directory.
+
+ ```sql
+ INSERT INTO test STAGE
+ ```