You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampark.apache.org by be...@apache.org on 2022/09/19 00:41:24 UTC

[incubator-streampark] branch dev updated: Chinese Notes to English Notes (#1640)

This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new 9d306516a Chinese Notes to English Notes (#1640)
9d306516a is described below

commit 9d306516ae874bf9cdf42077cb57a32426fe512c
Author: ChunFu Wu <31...@qq.com>
AuthorDate: Mon Sep 19 08:41:16 2022 +0800

    Chinese Notes to English Notes (#1640)
---
 .../streampark/common/enums/ClusterState.java      |  6 ++---
 .../common/enums/FlinkSqlValidationFailedType.java | 11 ++++-----
 .../streampark/common/util/ZooKeeperUtils.scala    | 10 ++++----
 .../flink/connector/function/RunningFunction.java  |  2 +-
 .../flink/connector/function/SQLQueryFunction.java |  2 +-
 .../connector/function/SQLResultFunction.java      |  2 +-
 .../function/StreamEnvConfigFunction.java          |  2 +-
 .../connector/function/TableEnvConfigFunction.java |  2 +-
 .../hbase/internal/HBaseSourceFunction.scala       |  6 ++---
 .../flink/connector/jdbc/sink/JdbcSink.scala       |  6 ++---
 .../flink/connector/kafka/sink/KafkaJavaSink.java  |  4 ++--
 .../flink/connector/kafka/source/KafkaSource.scala | 27 +++++++++++-----------
 .../flink/kubernetes/watcher/FlinkWatcher.scala    |  3 ---
 13 files changed, 39 insertions(+), 44 deletions(-)

diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/enums/ClusterState.java b/streampark-common/src/main/scala/org/apache/streampark/common/enums/ClusterState.java
index 9a531eb7b..cc2f70138 100644
--- a/streampark-common/src/main/scala/org/apache/streampark/common/enums/ClusterState.java
+++ b/streampark-common/src/main/scala/org/apache/streampark/common/enums/ClusterState.java
@@ -24,15 +24,15 @@ import java.io.Serializable;
  */
 public enum ClusterState implements Serializable {
     /**
-     * 集群刚创建但未启动
+     * The cluster was just created but not started
      */
     CREATED(0),
     /**
-     * 集群已启动
+     * cluster started
      */
     STARTED(1),
     /**
-     * 集群已停止
+     * cluster stopped
      */
     STOPED(2);
 
diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/enums/FlinkSqlValidationFailedType.java b/streampark-common/src/main/scala/org/apache/streampark/common/enums/FlinkSqlValidationFailedType.java
index 4fb6dae28..43ac6418f 100644
--- a/streampark-common/src/main/scala/org/apache/streampark/common/enums/FlinkSqlValidationFailedType.java
+++ b/streampark-common/src/main/scala/org/apache/streampark/common/enums/FlinkSqlValidationFailedType.java
@@ -19,26 +19,25 @@ package org.apache.streampark.common.enums;
 
 import java.io.Serializable;
 
-
 public enum FlinkSqlValidationFailedType implements Serializable {
     /**
-     * 基本检验失败(如为null等)
+     * Basic test failed (such as null, etc.)
      */
     VERIFY_FAILED(1),
     /**
-     * 语法错误
+     * syntax error
      */
     SYNTAX_ERROR(2),
     /**
-     * 不支持的方言
+     * unsupported dialect
      */
     UNSUPPORTED_DIALECT(3),
     /**
-     * 不支持的sql命令
+     * unsupported sql command
      */
     UNSUPPORTED_SQL(4),
     /**
-     * 非";"结尾
+     * Not at the end of ";"
      */
     ENDS_WITH(5);
 
diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/ZooKeeperUtils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/ZooKeeperUtils.scala
index ac7e39978..1e4d2908b 100644
--- a/streampark-common/src/main/scala/org/apache/streampark/common/util/ZooKeeperUtils.scala
+++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/ZooKeeperUtils.scala
@@ -36,15 +36,15 @@ object ZooKeeperUtils {
       case Some(x) => x
       case None =>
         try {
-          // 设置重连策略ExponentialBackoffRetry, baseSleepTimeMs:初始sleep的时间,maxRetries:最大重试次数,maxSleepMs:最大重试时间
+          // Set the reconnection policy ExponentialBackoffRetry, baseSleepTimeMs: initial sleep time, maxRetries: maximum number of retries, maxSleepMs: maximum retry time
           //val retryPolicy = new ExponentialBackoffRetry(10000, 5)
-          //(推荐)curator链接zookeeper的策略:RetryNTimes n:重试的次数 sleepMsBetweenRetries:每次重试间隔的时间
+          // (Recommended) Curator link zookeeper strategy: RetryNTimes n: the number of retries sleepMsBetweenRetries: the time between each retry
           //val retryPolicy:RetryPolicy = new RetryNTimes(3, 5000)
-          // (不推荐) curator链接zookeeper的策略:RetryOneTime sleepMsBetweenRetry:每次重试间隔的时间,这个策略只会重试一次
+          // (Not recommended) Curator link zookeeper strategy: RetryOneTime sleepMsBetweenRetry: the time between each retry, this strategy will only be retried once
           // val retryPolicy:RetryPolicy = new RetryOneTime(3000)
-          // 永远重试,不推荐使用
+          // Retry forever, not recommended
           // val retryPolicy:RetryPolicy = new RetryForever(retryIntervalMs)
-          // curator链接zookeeper的策略:RetryUntilElapsed maxElapsedTimeMs:最大重试时间 sleepMsBetweenRetries:每次重试间隔 重试时间超过maxElapsedTimeMs后,就不再重试
+          // Curator link zookeeper strategy: RetryUntilElapsed maxElapsedTimeMs: Maximum retry time sleepMsBetweenRetries: After each retry interval, after the retry time exceeds maxElapsedTimeMs, it will not retry
           // val retryPolicy:RetryPolicy = new RetryUntilElapsed(2000, 3000)
           val retryPolicy: RetryPolicy = new RetryNTimes(5, 2000)
           val client = CuratorFrameworkFactory
diff --git a/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/scala/org/apache/streampark/flink/connector/function/RunningFunction.java b/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/scala/org/apache/streampark/flink/connector/function/RunningFunction.java
index ba33a23dd..4fc49914b 100644
--- a/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/scala/org/apache/streampark/flink/connector/function/RunningFunction.java
+++ b/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/scala/org/apache/streampark/flink/connector/function/RunningFunction.java
@@ -23,7 +23,7 @@ import java.io.Serializable;
 public interface RunningFunction extends Serializable {
 
     /**
-     * 是否running...
+     * Is it running...
      *
      * @return Boolean: isRunning
      */
diff --git a/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/scala/org/apache/streampark/flink/connector/function/SQLQueryFunction.java b/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/scala/org/apache/streampark/flink/connector/function/SQLQueryFunction.java
index 194def802..17a266ae7 100644
--- a/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/scala/org/apache/streampark/flink/connector/function/SQLQueryFunction.java
+++ b/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/scala/org/apache/streampark/flink/connector/function/SQLQueryFunction.java
@@ -22,7 +22,7 @@ import java.io.Serializable;
 @FunctionalInterface
 public interface SQLQueryFunction<T> extends Serializable {
     /**
-     * 获取要查询的SQL
+     * Get the SQL to query
      *
      * @param last: last one
      * @return String: sql
diff --git a/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/scala/org/apache/streampark/flink/connector/function/SQLResultFunction.java b/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/scala/org/apache/streampark/flink/connector/function/SQLResultFunction.java
index 73b05d7bc..ff08c7ab6 100644
--- a/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/scala/org/apache/streampark/flink/connector/function/SQLResultFunction.java
+++ b/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/scala/org/apache/streampark/flink/connector/function/SQLResultFunction.java
@@ -23,7 +23,7 @@ import java.util.Map;
 @FunctionalInterface
 public interface SQLResultFunction<T> extends Serializable {
     /**
-     * 将查下结果以Map的方式返回,用户去实现转成对象.
+     * The result of the search is returned as a Map, and the user can convert it into an object.
      *
      * @param map: sqlQuery result
      * @return Iterable: Iterable
diff --git a/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/scala/org/apache/streampark/flink/connector/function/StreamEnvConfigFunction.java b/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/scala/org/apache/streampark/flink/connector/function/StreamEnvConfigFunction.java
index ac63fd4aa..da42db93c 100644
--- a/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/scala/org/apache/streampark/flink/connector/function/StreamEnvConfigFunction.java
+++ b/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/scala/org/apache/streampark/flink/connector/function/StreamEnvConfigFunction.java
@@ -25,7 +25,7 @@ import java.io.Serializable;
 @FunctionalInterface
 public interface StreamEnvConfigFunction extends Serializable {
     /**
-     * 用于初始化StreamExecutionEnvironment的时候,用于可以实现该函数,自定义要设置的参数...
+     * When used to initialize StreamExecutionEnvironment, it is used to implement this function and customize the parameters to be set...
      *
      * @param environment:   StreamExecutionEnvironment instance
      * @param parameterTool: ParameterTool
diff --git a/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/scala/org/apache/streampark/flink/connector/function/TableEnvConfigFunction.java b/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/scala/org/apache/streampark/flink/connector/function/TableEnvConfigFunction.java
index 3a1b8ba8e..506ced9df 100644
--- a/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/scala/org/apache/streampark/flink/connector/function/TableEnvConfigFunction.java
+++ b/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/scala/org/apache/streampark/flink/connector/function/TableEnvConfigFunction.java
@@ -25,7 +25,7 @@ import java.io.Serializable;
 @FunctionalInterface
 public interface TableEnvConfigFunction extends Serializable {
     /**
-     * 用于初始化TableEnvironment的时候,用于可以实现该函数,自定义要设置的参数...
+     * When used to initialize TableEnvironment, it is used to implement this function and customize the parameters to be set...
      *
      * @param tableConfig:   flink tableConfig
      * @param parameterTool: parameterTool
diff --git a/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/scala/org/apache/streampark/flink/connector/hbase/internal/HBaseSourceFunction.scala b/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/scala/org/apache/streampark/flink/connector/hbase/internal/HBaseSourceFunction.scala
index dfdac9e09..823adeec4 100644
--- a/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/scala/org/apache/streampark/flink/connector/hbase/internal/HBaseSourceFunction.scala
+++ b/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/scala/org/apache/streampark/flink/connector/hbase/internal/HBaseSourceFunction.scala
@@ -97,7 +97,7 @@ class HBaseSourceFunction[R: TypeInformation](apiType: ApiType = ApiType.scala,
         case ApiType.scala =>
           if (scalaRunningFunc()) {
             ctx.getCheckpointLock.synchronized {
-              //将上次(或者从checkpoint中恢复)的query查询对象返回用户,用户根据这个构建下次要查询的条件.
+              // Returns the query object of the last (or recovered from checkpoint) query to the user, and the user constructs the conditions for the next query based on this.
               query = scalaQueryFunc(last)
               require(query != null && query.getTable != null, "[StreamPark] HBaseSource query and query's param table must not be null ")
               table = query.getTable(prop)
@@ -110,7 +110,7 @@ class HBaseSourceFunction[R: TypeInformation](apiType: ApiType = ApiType.scala,
         case ApiType.java =>
           if (javaRunningFunc.running()) {
             ctx.getCheckpointLock.synchronized {
-              //将上次(或者从checkpoint中恢复)的query查询对象返回用户,用户根据这个构建下次要查询的条件.
+              // Returns the query object of the last (or recovered from checkpoint) query to the user, and the user constructs the conditions for the next query based on this.
               query = javaQueryFunc.query(last)
               require(query != null && query.getTable != null, "[StreamPark] HBaseSource query and query's param table must not be null ")
               table = query.getTable(prop)
@@ -145,7 +145,7 @@ class HBaseSourceFunction[R: TypeInformation](apiType: ApiType = ApiType.scala,
   }
 
   override def initializeState(context: FunctionInitializationContext): Unit = {
-    //从checkpoint中恢复...
+    // Recover from checkpoint...
     logInfo("HBaseSource snapshotState initialize")
     state = FlinkUtils.getUnionListState[R](context, OFFSETS_STATE_NAME)
     Try(state.get.head) match {
diff --git a/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/scala/org/apache/streampark/flink/connector/jdbc/sink/JdbcSink.scala b/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/scala/org/apache/streampark/flink/connector/jdbc/sink/JdbcSink.scala
index 55f7d43f1..f86aa9f3e 100644
--- a/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/scala/org/apache/streampark/flink/connector/jdbc/sink/JdbcSink.scala
+++ b/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/scala/org/apache/streampark/flink/connector/jdbc/sink/JdbcSink.scala
@@ -36,7 +36,7 @@ object JdbcSink {
 
   /**
    * @param ctx   : StreamingContext
-   * @param alias :    实例别名(用于区分多个不同的数据库实例...)
+   * @param alias : Instance alias (used to distinguish between multiple different database instances...)
    * @return
    */
   def apply(@(transient@param)
@@ -56,8 +56,8 @@ class JdbcSink(@(transient@param) ctx: StreamingContext,
   /**
    *
    * @param stream  : DataStream
-   * @param toSQLFn : 转换成SQL的函数,有用户提供.
-   * @tparam T : DataStream里的流的数据类型
+   * @param toSQLFn : The function converted to SQL is provided by the user.
+   * @tparam T : The data type of the stream in the DataStream
    * @return
    */
   def sink[T](stream: DataStream[T])(implicit toSQLFn: T => String): DataStreamSink[T] = {
diff --git a/streampark-flink/streampark-flink-connector/streampark-flink-connector-kafka/src/main/scala/org/apache/streampark/flink/connector/kafka/sink/KafkaJavaSink.java b/streampark-flink/streampark-flink-connector/streampark-flink-connector-kafka/src/main/scala/org/apache/streampark/flink/connector/kafka/sink/KafkaJavaSink.java
index 403a1925d..efb946111 100644
--- a/streampark-flink/streampark-flink-connector/streampark-flink-connector-kafka/src/main/scala/org/apache/streampark/flink/connector/kafka/sink/KafkaJavaSink.java
+++ b/streampark-flink/streampark-flink-connector/streampark-flink-connector-kafka/src/main/scala/org/apache/streampark/flink/connector/kafka/sink/KafkaJavaSink.java
@@ -47,7 +47,7 @@ public class KafkaJavaSink<T> {
 
     public KafkaJavaSink(StreamingContext context) {
         this.context = context;
-        //默认partitioner为KafkaEqualityPartitioner
+        // The default partitioner is KafkaEqualityPartitioner
         partitioner = new KafkaEqualityPartitioner<T>(context.getParallelism());
     }
 
@@ -87,7 +87,7 @@ public class KafkaJavaSink<T> {
     }
 
     /**
-     * 设置要下沉的topic
+     * Set the topic to sink
      *
      * @param topic: topic name
      * @return KafkaSink: KafkaSink instance
diff --git a/streampark-flink/streampark-flink-connector/streampark-flink-connector-kafka/src/main/scala/org/apache/streampark/flink/connector/kafka/source/KafkaSource.scala b/streampark-flink/streampark-flink-connector/streampark-flink-connector-kafka/src/main/scala/org/apache/streampark/flink/connector/kafka/source/KafkaSource.scala
index 9b299f792..eebd4f812 100644
--- a/streampark-flink/streampark-flink-connector/streampark-flink-connector-kafka/src/main/scala/org/apache/streampark/flink/connector/kafka/source/KafkaSource.scala
+++ b/streampark-flink/streampark-flink-connector/streampark-flink-connector-kafka/src/main/scala/org/apache/streampark/flink/connector/kafka/source/KafkaSource.scala
@@ -144,23 +144,22 @@ object KafkaSource {
 
 class KafkaSource(@(transient@param) private[this] val ctx: StreamingContext, property: Properties = new Properties()) {
   /**
+    * commit offset method:<br/>
+    * &nbsp;&nbsp;Flink kafka consumer commit offset method needs to distinguish whether checkpoint is enabled. <br/>
+    * &nbsp;&nbsp; 1) Checkpoint off: commit offset depends on auto commit of kafka client.
+    * Need to set enable.auto.commit, auto.commit.interval.ms parameters to consumer properties,
+    * It will periodically auto commit offset to kafka at regular intervals. <br/>
+    * &nbsp;&nbsp; 2) Checkpoint is enabled: At this time, the offset consumed by the job is managed and fault-tolerant by Flink in the state.
+    * Submitting offsets to kafka at this time is generally used as an external progress monitor. I want to know the location and lag of job consumption in real time.
+    * At this point, setCommitOffsetsOnCheckpoints needs to be true to set the offset to be submitted to kafka when the checkpoint is successful.
+    * At this time, the interval of commit offset depends on the interval of checkpoint
     *
-    * commit offset 方式:<br/>
-    * &nbsp;&nbsp;Flink kafka consumer commit offset 方式需要区分是否开启了 checkpoint。<br/>
-    * &nbsp;&nbsp; 1) checkpoint 关闭: commit offset 要依赖于 kafka 客户端的 auto commit。
-    * 需设置 enable.auto.commit,auto.commit.interval.ms 参数到 consumer properties,
-    * 就会按固定的时间间隔定期 auto commit offset 到 kafka。<br/>
-    * &nbsp;&nbsp; 2) checkpoint 开启: 这个时候作业消费的 offset 是 Flink 在 state 中自己管理和容错。
-    * 此时提交 offset 到 kafka,一般都是作为外部进度的监控,想实时知道作业消费的位置和 lag 情况。
-    * 此时需要 setCommitOffsetsOnCheckpoints 为 true 来设置当 checkpoint 成功时提交 offset 到 kafka。
-    * 此时 commit offset 的间隔就取决于 checkpoint 的间隔
+    * Get DStream stream
     *
-    * 获取DStream 流
-    *
-    * @param topic        一组topic或者单个topic
-    * @param alias        别名,区分不同的kafka连接实例
+    * @param topic        a group of topics or a single topic
+    * @param alias        Aliases to distinguish different kafka connection instances
     * @param deserializer DeserializationSchema
-    * @param strategy     Watermarks 策略
+    * @param strategy     Watermarks strategy
     * @tparam T
     */
   def getDataStream[T: TypeInformation](topic: java.io.Serializable = null,
diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkWatcher.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkWatcher.scala
index e19108478..bc0b1b4d9 100644
--- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkWatcher.scala
+++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkWatcher.scala
@@ -20,9 +20,6 @@ package org.apache.streampark.flink.kubernetes.watcher
 import java.util.concurrent.atomic.AtomicBoolean
 import scala.language.implicitConversions
 
-/**
- * auth: Al-assad
- */
 trait FlinkWatcher extends AutoCloseable {
 
   private[this] val started: AtomicBoolean = new AtomicBoolean(false)