You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampark.apache.org by be...@apache.org on 2022/09/19 00:41:24 UTC
[incubator-streampark] branch dev updated: Chinese Notes to English Notes (#1640)
This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 9d306516a Chinese Notes to English Notes (#1640)
9d306516a is described below
commit 9d306516ae874bf9cdf42077cb57a32426fe512c
Author: ChunFu Wu <31...@qq.com>
AuthorDate: Mon Sep 19 08:41:16 2022 +0800
Chinese Notes to English Notes (#1640)
---
.../streampark/common/enums/ClusterState.java | 6 ++---
.../common/enums/FlinkSqlValidationFailedType.java | 11 ++++-----
.../streampark/common/util/ZooKeeperUtils.scala | 10 ++++----
.../flink/connector/function/RunningFunction.java | 2 +-
.../flink/connector/function/SQLQueryFunction.java | 2 +-
.../connector/function/SQLResultFunction.java | 2 +-
.../function/StreamEnvConfigFunction.java | 2 +-
.../connector/function/TableEnvConfigFunction.java | 2 +-
.../hbase/internal/HBaseSourceFunction.scala | 6 ++---
.../flink/connector/jdbc/sink/JdbcSink.scala | 6 ++---
.../flink/connector/kafka/sink/KafkaJavaSink.java | 4 ++--
.../flink/connector/kafka/source/KafkaSource.scala | 27 +++++++++++-----------
.../flink/kubernetes/watcher/FlinkWatcher.scala | 3 ---
13 files changed, 39 insertions(+), 44 deletions(-)
diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/enums/ClusterState.java b/streampark-common/src/main/scala/org/apache/streampark/common/enums/ClusterState.java
index 9a531eb7b..cc2f70138 100644
--- a/streampark-common/src/main/scala/org/apache/streampark/common/enums/ClusterState.java
+++ b/streampark-common/src/main/scala/org/apache/streampark/common/enums/ClusterState.java
@@ -24,15 +24,15 @@ import java.io.Serializable;
*/
public enum ClusterState implements Serializable {
/**
- * 集群刚创建但未启动
+ * The cluster was just created but not started
*/
CREATED(0),
/**
- * 集群已启动
+ * cluster started
*/
STARTED(1),
/**
- * 集群已停止
+ * cluster stopped
*/
STOPED(2);
diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/enums/FlinkSqlValidationFailedType.java b/streampark-common/src/main/scala/org/apache/streampark/common/enums/FlinkSqlValidationFailedType.java
index 4fb6dae28..43ac6418f 100644
--- a/streampark-common/src/main/scala/org/apache/streampark/common/enums/FlinkSqlValidationFailedType.java
+++ b/streampark-common/src/main/scala/org/apache/streampark/common/enums/FlinkSqlValidationFailedType.java
@@ -19,26 +19,25 @@ package org.apache.streampark.common.enums;
import java.io.Serializable;
-
public enum FlinkSqlValidationFailedType implements Serializable {
/**
- * 基本检验失败(如为null等)
+ * Basic test failed (such as null, etc.)
*/
VERIFY_FAILED(1),
/**
- * 语法错误
+ * syntax error
*/
SYNTAX_ERROR(2),
/**
- * 不支持的方言
+ * unsupported dialect
*/
UNSUPPORTED_DIALECT(3),
/**
- * 不支持的sql命令
+ * unsupported sql command
*/
UNSUPPORTED_SQL(4),
/**
- * 非";"结尾
+ * Not at the end of ";"
*/
ENDS_WITH(5);
diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/ZooKeeperUtils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/ZooKeeperUtils.scala
index ac7e39978..1e4d2908b 100644
--- a/streampark-common/src/main/scala/org/apache/streampark/common/util/ZooKeeperUtils.scala
+++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/ZooKeeperUtils.scala
@@ -36,15 +36,15 @@ object ZooKeeperUtils {
case Some(x) => x
case None =>
try {
- // 设置重连策略ExponentialBackoffRetry, baseSleepTimeMs:初始sleep的时间,maxRetries:最大重试次数,maxSleepMs:最大重试时间
+ // Set the reconnection policy ExponentialBackoffRetry, baseSleepTimeMs: initial sleep time, maxRetries: maximum number of retries, maxSleepMs: maximum retry time
//val retryPolicy = new ExponentialBackoffRetry(10000, 5)
- //(推荐)curator链接zookeeper的策略:RetryNTimes n:重试的次数 sleepMsBetweenRetries:每次重试间隔的时间
+ // (Recommended) Curator link zookeeper strategy: RetryNTimes n: the number of retries sleepMsBetweenRetries: the time between each retry
//val retryPolicy:RetryPolicy = new RetryNTimes(3, 5000)
- // (不推荐) curator链接zookeeper的策略:RetryOneTime sleepMsBetweenRetry:每次重试间隔的时间,这个策略只会重试一次
+ // (Not recommended) Curator link zookeeper strategy: RetryOneTime sleepMsBetweenRetry: the time between each retry, this strategy will only be retried once
// val retryPolicy:RetryPolicy = new RetryOneTime(3000)
- // 永远重试,不推荐使用
+ // Retry forever, not recommended
// val retryPolicy:RetryPolicy = new RetryForever(retryIntervalMs)
- // curator链接zookeeper的策略:RetryUntilElapsed maxElapsedTimeMs:最大重试时间 sleepMsBetweenRetries:每次重试间隔 重试时间超过maxElapsedTimeMs后,就不再重试
+ // Curator link zookeeper strategy: RetryUntilElapsed maxElapsedTimeMs: Maximum retry time sleepMsBetweenRetries: After each retry interval, after the retry time exceeds maxElapsedTimeMs, it will not retry
// val retryPolicy:RetryPolicy = new RetryUntilElapsed(2000, 3000)
val retryPolicy: RetryPolicy = new RetryNTimes(5, 2000)
val client = CuratorFrameworkFactory
diff --git a/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/scala/org/apache/streampark/flink/connector/function/RunningFunction.java b/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/scala/org/apache/streampark/flink/connector/function/RunningFunction.java
index ba33a23dd..4fc49914b 100644
--- a/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/scala/org/apache/streampark/flink/connector/function/RunningFunction.java
+++ b/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/scala/org/apache/streampark/flink/connector/function/RunningFunction.java
@@ -23,7 +23,7 @@ import java.io.Serializable;
public interface RunningFunction extends Serializable {
/**
- * 是否running...
+ * Is it running...
*
* @return Boolean: isRunning
*/
diff --git a/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/scala/org/apache/streampark/flink/connector/function/SQLQueryFunction.java b/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/scala/org/apache/streampark/flink/connector/function/SQLQueryFunction.java
index 194def802..17a266ae7 100644
--- a/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/scala/org/apache/streampark/flink/connector/function/SQLQueryFunction.java
+++ b/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/scala/org/apache/streampark/flink/connector/function/SQLQueryFunction.java
@@ -22,7 +22,7 @@ import java.io.Serializable;
@FunctionalInterface
public interface SQLQueryFunction<T> extends Serializable {
/**
- * 获取要查询的SQL
+ * Get the SQL to query
*
* @param last: last one
* @return String: sql
diff --git a/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/scala/org/apache/streampark/flink/connector/function/SQLResultFunction.java b/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/scala/org/apache/streampark/flink/connector/function/SQLResultFunction.java
index 73b05d7bc..ff08c7ab6 100644
--- a/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/scala/org/apache/streampark/flink/connector/function/SQLResultFunction.java
+++ b/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/scala/org/apache/streampark/flink/connector/function/SQLResultFunction.java
@@ -23,7 +23,7 @@ import java.util.Map;
@FunctionalInterface
public interface SQLResultFunction<T> extends Serializable {
/**
- * 将查下结果以Map的方式返回,用户去实现转成对象.
+ * The result of the search is returned as a Map, and the user can convert it into an object.
*
* @param map: sqlQuery result
* @return Iterable: Iterable
diff --git a/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/scala/org/apache/streampark/flink/connector/function/StreamEnvConfigFunction.java b/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/scala/org/apache/streampark/flink/connector/function/StreamEnvConfigFunction.java
index ac63fd4aa..da42db93c 100644
--- a/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/scala/org/apache/streampark/flink/connector/function/StreamEnvConfigFunction.java
+++ b/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/scala/org/apache/streampark/flink/connector/function/StreamEnvConfigFunction.java
@@ -25,7 +25,7 @@ import java.io.Serializable;
@FunctionalInterface
public interface StreamEnvConfigFunction extends Serializable {
/**
- * 用于初始化StreamExecutionEnvironment的时候,用于可以实现该函数,自定义要设置的参数...
+ * When used to initialize StreamExecutionEnvironment, it is used to implement this function and customize the parameters to be set...
*
* @param environment: StreamExecutionEnvironment instance
* @param parameterTool: ParameterTool
diff --git a/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/scala/org/apache/streampark/flink/connector/function/TableEnvConfigFunction.java b/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/scala/org/apache/streampark/flink/connector/function/TableEnvConfigFunction.java
index 3a1b8ba8e..506ced9df 100644
--- a/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/scala/org/apache/streampark/flink/connector/function/TableEnvConfigFunction.java
+++ b/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/scala/org/apache/streampark/flink/connector/function/TableEnvConfigFunction.java
@@ -25,7 +25,7 @@ import java.io.Serializable;
@FunctionalInterface
public interface TableEnvConfigFunction extends Serializable {
/**
- * 用于初始化TableEnvironment的时候,用于可以实现该函数,自定义要设置的参数...
+ * When used to initialize TableEnvironment, it is used to implement this function and customize the parameters to be set...
*
* @param tableConfig: flink tableConfig
* @param parameterTool: parameterTool
diff --git a/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/scala/org/apache/streampark/flink/connector/hbase/internal/HBaseSourceFunction.scala b/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/scala/org/apache/streampark/flink/connector/hbase/internal/HBaseSourceFunction.scala
index dfdac9e09..823adeec4 100644
--- a/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/scala/org/apache/streampark/flink/connector/hbase/internal/HBaseSourceFunction.scala
+++ b/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/scala/org/apache/streampark/flink/connector/hbase/internal/HBaseSourceFunction.scala
@@ -97,7 +97,7 @@ class HBaseSourceFunction[R: TypeInformation](apiType: ApiType = ApiType.scala,
case ApiType.scala =>
if (scalaRunningFunc()) {
ctx.getCheckpointLock.synchronized {
- //将上次(或者从checkpoint中恢复)的query查询对象返回用户,用户根据这个构建下次要查询的条件.
+ // Returns the query object of the last (or recovered from checkpoint) query to the user, and the user constructs the conditions for the next query based on this.
query = scalaQueryFunc(last)
require(query != null && query.getTable != null, "[StreamPark] HBaseSource query and query's param table must not be null ")
table = query.getTable(prop)
@@ -110,7 +110,7 @@ class HBaseSourceFunction[R: TypeInformation](apiType: ApiType = ApiType.scala,
case ApiType.java =>
if (javaRunningFunc.running()) {
ctx.getCheckpointLock.synchronized {
- //将上次(或者从checkpoint中恢复)的query查询对象返回用户,用户根据这个构建下次要查询的条件.
+ // Returns the query object of the last (or recovered from checkpoint) query to the user, and the user constructs the conditions for the next query based on this.
query = javaQueryFunc.query(last)
require(query != null && query.getTable != null, "[StreamPark] HBaseSource query and query's param table must not be null ")
table = query.getTable(prop)
@@ -145,7 +145,7 @@ class HBaseSourceFunction[R: TypeInformation](apiType: ApiType = ApiType.scala,
}
override def initializeState(context: FunctionInitializationContext): Unit = {
- //从checkpoint中恢复...
+ // Recover from checkpoint...
logInfo("HBaseSource snapshotState initialize")
state = FlinkUtils.getUnionListState[R](context, OFFSETS_STATE_NAME)
Try(state.get.head) match {
diff --git a/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/scala/org/apache/streampark/flink/connector/jdbc/sink/JdbcSink.scala b/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/scala/org/apache/streampark/flink/connector/jdbc/sink/JdbcSink.scala
index 55f7d43f1..f86aa9f3e 100644
--- a/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/scala/org/apache/streampark/flink/connector/jdbc/sink/JdbcSink.scala
+++ b/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/scala/org/apache/streampark/flink/connector/jdbc/sink/JdbcSink.scala
@@ -36,7 +36,7 @@ object JdbcSink {
/**
* @param ctx : StreamingContext
- * @param alias : 实例别名(用于区分多个不同的数据库实例...)
+ * @param alias : Instance alias (used to distinguish between multiple different database instances...)
* @return
*/
def apply(@(transient@param)
@@ -56,8 +56,8 @@ class JdbcSink(@(transient@param) ctx: StreamingContext,
/**
*
* @param stream : DataStream
- * @param toSQLFn : 转换成SQL的函数,有用户提供.
- * @tparam T : DataStream里的流的数据类型
+ * @param toSQLFn : The function converted to SQL is provided by the user.
+ * @tparam T : The data type of the stream in the DataStream
* @return
*/
def sink[T](stream: DataStream[T])(implicit toSQLFn: T => String): DataStreamSink[T] = {
diff --git a/streampark-flink/streampark-flink-connector/streampark-flink-connector-kafka/src/main/scala/org/apache/streampark/flink/connector/kafka/sink/KafkaJavaSink.java b/streampark-flink/streampark-flink-connector/streampark-flink-connector-kafka/src/main/scala/org/apache/streampark/flink/connector/kafka/sink/KafkaJavaSink.java
index 403a1925d..efb946111 100644
--- a/streampark-flink/streampark-flink-connector/streampark-flink-connector-kafka/src/main/scala/org/apache/streampark/flink/connector/kafka/sink/KafkaJavaSink.java
+++ b/streampark-flink/streampark-flink-connector/streampark-flink-connector-kafka/src/main/scala/org/apache/streampark/flink/connector/kafka/sink/KafkaJavaSink.java
@@ -47,7 +47,7 @@ public class KafkaJavaSink<T> {
public KafkaJavaSink(StreamingContext context) {
this.context = context;
- //默认partitioner为KafkaEqualityPartitioner
+ // The default partitioner is KafkaEqualityPartitioner
partitioner = new KafkaEqualityPartitioner<T>(context.getParallelism());
}
@@ -87,7 +87,7 @@ public class KafkaJavaSink<T> {
}
/**
- * 设置要下沉的topic
+ * Set the topic to sink
*
* @param topic: topic name
* @return KafkaSink: KafkaSink instance
diff --git a/streampark-flink/streampark-flink-connector/streampark-flink-connector-kafka/src/main/scala/org/apache/streampark/flink/connector/kafka/source/KafkaSource.scala b/streampark-flink/streampark-flink-connector/streampark-flink-connector-kafka/src/main/scala/org/apache/streampark/flink/connector/kafka/source/KafkaSource.scala
index 9b299f792..eebd4f812 100644
--- a/streampark-flink/streampark-flink-connector/streampark-flink-connector-kafka/src/main/scala/org/apache/streampark/flink/connector/kafka/source/KafkaSource.scala
+++ b/streampark-flink/streampark-flink-connector/streampark-flink-connector-kafka/src/main/scala/org/apache/streampark/flink/connector/kafka/source/KafkaSource.scala
@@ -144,23 +144,22 @@ object KafkaSource {
class KafkaSource(@(transient@param) private[this] val ctx: StreamingContext, property: Properties = new Properties()) {
/**
+ * commit offset method:<br/>
+ * Flink kafka consumer commit offset method needs to distinguish whether checkpoint is enabled. <br/>
+ * 1) Checkpoint off: commit offset depends on auto commit of kafka client.
+ * Need to set enable.auto.commit, auto.commit.interval.ms parameters to consumer properties,
+ * It will periodically auto commit offset to kafka at regular intervals. <br/>
+ * 2) Checkpoint is enabled: At this time, the offset consumed by the job is managed and fault-tolerant by Flink in the state.
+ * Submitting offsets to kafka at this time is generally used as an external progress monitor. I want to know the location and lag of job consumption in real time.
+ * At this point, setCommitOffsetsOnCheckpoints needs to be true to set the offset to be submitted to kafka when the checkpoint is successful.
+ * At this time, the interval of commit offset depends on the interval of checkpoint
*
- * commit offset 方式:<br/>
- * Flink kafka consumer commit offset 方式需要区分是否开启了 checkpoint。<br/>
- * 1) checkpoint 关闭: commit offset 要依赖于 kafka 客户端的 auto commit。
- * 需设置 enable.auto.commit,auto.commit.interval.ms 参数到 consumer properties,
- * 就会按固定的时间间隔定期 auto commit offset 到 kafka。<br/>
- * 2) checkpoint 开启: 这个时候作业消费的 offset 是 Flink 在 state 中自己管理和容错。
- * 此时提交 offset 到 kafka,一般都是作为外部进度的监控,想实时知道作业消费的位置和 lag 情况。
- * 此时需要 setCommitOffsetsOnCheckpoints 为 true 来设置当 checkpoint 成功时提交 offset 到 kafka。
- * 此时 commit offset 的间隔就取决于 checkpoint 的间隔
+ * Get DStream stream
*
- * 获取DStream 流
- *
- * @param topic 一组topic或者单个topic
- * @param alias 别名,区分不同的kafka连接实例
+ * @param topic a group of topics or a single topic
+ * @param alias Aliases to distinguish different kafka connection instances
* @param deserializer DeserializationSchema
- * @param strategy Watermarks 策略
+ * @param strategy Watermarks strategy
* @tparam T
*/
def getDataStream[T: TypeInformation](topic: java.io.Serializable = null,
diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkWatcher.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkWatcher.scala
index e19108478..bc0b1b4d9 100644
--- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkWatcher.scala
+++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkWatcher.scala
@@ -20,9 +20,6 @@ package org.apache.streampark.flink.kubernetes.watcher
import java.util.concurrent.atomic.AtomicBoolean
import scala.language.implicitConversions
-/**
- * auth: Al-assad
- */
trait FlinkWatcher extends AutoCloseable {
private[this] val started: AtomicBoolean = new AtomicBoolean(false)