You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2023/10/24 04:49:12 UTC
[inlong] branch master updated: [INLONG-9089][Agent] Add enums for task and instance (#9090)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new e5ba995f75 [INLONG-9089][Agent] Add enums for task and instance (#9090)
e5ba995f75 is described below
commit e5ba995f754af402f2c915c40ac68b8f40a6ccdc
Author: justinwwhuang <hw...@163.com>
AuthorDate: Mon Oct 23 23:49:06 2023 -0500
[INLONG-9089][Agent] Add enums for task and instance (#9090)
---
.../inlong/agent/constant/TaskConstants.java | 218 +++++++++++++++++++++
.../inlong/common/enums/InstanceStateEnum.java | 52 +++++
.../apache/inlong/common/enums/TaskStateEnum.java | 55 ++++++
3 files changed, 325 insertions(+)
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
new file mode 100755
index 0000000000..bbf95cea76
--- /dev/null
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.constant;
+
+/**
+ * Basic config for a single job
+ */
+public class TaskConstants extends CommonConstants {
+
+ // job id
+ // public static final String JOB_ID = "job.id";
+ public static final String TASK_ID = "task.id";
+ public static final String INSTANCE_ID = "instance.id";
+ public static final String JOB_INSTANCE_ID = "job.instance.id";
+ public static final String INSTANCE_CREATE_TIME = "instance.createTime";
+ public static final String INSTANCE_MODIFY_TIME = "instance.modifyTime";
+ public static final String JOB_IP = "job.ip";
+ public static final String JOB_RETRY = "job.retry";
+ public static final String JOB_UUID = "job.uuid";
+ public static final String TASK_GROUP_ID = "task.groupId";
+ public static final String TASK_STREAM_ID = "task.streamId";
+
+ public static final String TASK_SOURCE = "task.source";
+ public static final String JOB_SOURCE_TYPE = "job.sourceType";
+
+ public static final String TASK_CHANNEL = "task.channel";
+ public static final String JOB_NAME = "job.name";
+ public static final String JOB_LINE_FILTER_PATTERN = "job.pattern";
+
+ public static final String DEFAULT_JOB_NAME = "default";
+ public static final String JOB_DESCRIPTION = "job.description";
+ public static final String DEFAULT_JOB_DESCRIPTION = "default job description";
+ public static final String DEFAULT_JOB_LINE_FILTER = "";
+ public static final String TASK_CLASS = "task.taskClass";
+ public static final String INSTANCE_CLASS = "task.instance.class";
+ public static final String JOB_FILE_TRIGGER = "job.fileTask.trigger";
+
+ // sink config
+ public static final String TASK_SINK = "task.sink";
+ public static final String JOB_PROXY_SEND = "job.proxySend";
+ public static final boolean DEFAULT_JOB_PROXY_SEND = false;
+ public static final String JOB_MQ_ClUSTERS = "job.mqClusters";
+ public static final String JOB_MQ_TOPIC = "job.topicInfo";
+ public static final String OFFSET = "offset";
+ public static final Long DEFAULT_OFFSET = -1L;
+ public static final String INODE_INFO = "inodeInfo";
+
+ // File job
+ public static final String TASK_DIR_FILTER_PATTERN = "task.fileTask.dir.pattern"; // deprecated
+ public static final String FILE_DIR_FILTER_PATTERNS = "task.fileTask.dir.patterns";
+ public static final String TASK_FILE_TIME_OFFSET = "task.fileTask.timeOffset";
+ public static final String TASK_FILE_MAX_WAIT = "task.fileTask.file.max.wait";
+ public static final String TASK_CYCLE_UNIT = "task.fileTask.cycleUnit";
+ public static final String TASK_FILE_TRIGGER_TYPE = "task.fileTask.collectType";
+ public static final String JOB_FILE_LINE_END_PATTERN = "job.fileTask.line.endPattern";
+ public static final String JOB_FILE_CONTENT_COLLECT_TYPE = "job.fileTask.contentCollectType";
+ public static final String JOB_FILE_META_ENV_LIST = "job.fileTask.envList";
+ public static final String JOB_FILE_META_FILTER_BY_LABELS = "job.fileTask.filterMetaByLabels";
+ public static final String JOB_FILE_PROPERTIES = "job.fileTask.properties";
+ public static final String JOB_FILE_DATA_SOURCE_COLUMN_SEPARATOR = "job.fileTask.dataSeparator";
+ public static final String JOB_FILE_MONITOR_INTERVAL = "job.fileTask.monitorInterval";
+ public static final String JOB_FILE_MONITOR_STATUS = "job.fileTask.monitorStatus";
+ public static final String JOB_FILE_MONITOR_EXPIRE = "job.fileTask.monitorExpire";
+ public static final String TASK_RETRY = "task.fileTask.retry";
+ public static final String TASK_START_TIME = "task.fileTask.startTime";
+ public static final String TASK_END_TIME = "task.fileTask.endTime";
+ public static final String FILE_MAX_NUM = "task.fileTask.maxFileCount";
+
+ // Binlog job
+ public static final String JOB_DATABASE_USER = "job.binlogJob.user";
+ public static final String JOB_DATABASE_PASSWORD = "job.binlogJob.password";
+ public static final String JOB_DATABASE_HOSTNAME = "job.binlogJob.hostname";
+ public static final String JOB_TABLE_WHITELIST = "job.binlogJob.tableWhiteList";
+ public static final String JOB_DATABASE_WHITELIST = "job.binlogJob.databaseWhiteList";
+ public static final String JOB_DATABASE_OFFSETS = "job.binlogJob.offsets";
+ public static final String JOB_DATABASE_OFFSET_FILENAME = "job.binlogJob.offset.filename";
+
+ public static final String JOB_DATABASE_SERVER_TIME_ZONE = "job.binlogJob.serverTimezone";
+ public static final String JOB_DATABASE_STORE_OFFSET_INTERVAL_MS = "job.binlogJob.offset.intervalMs";
+
+ public static final String JOB_DATABASE_STORE_HISTORY_FILENAME = "job.binlogJob.history.filename";
+ public static final String JOB_DATABASE_INCLUDE_SCHEMA_CHANGES = "job.binlogJob.schema";
+ public static final String JOB_DATABASE_SNAPSHOT_MODE = "job.binlogJob.snapshot.mode";
+ public static final String JOB_DATABASE_HISTORY_MONITOR_DDL = "job.binlogJob.ddl";
+ public static final String JOB_DATABASE_PORT = "job.binlogJob.port";
+
+ // Kafka job
+ public static final String JOB_KAFKA_TOPIC = "job.kafkaJob.topic";
+ public static final String JOB_KAFKA_BOOTSTRAP_SERVERS = "job.kafkaJob.bootstrap.servers";
+ public static final String JOB_KAFKA_GROUP_ID = "job.kafkaJob.group.id";
+ public static final String JOB_KAFKA_RECORD_SPEED_LIMIT = "job.kafkaJob.recordSpeed.limit";
+ public static final String JOB_KAFKA_BYTE_SPEED_LIMIT = "job.kafkaJob.byteSpeed.limit";
+ public static final String JOB_KAFKA_OFFSET = "job.kafkaJob.partition.offset";
+ public static final String JOB_KAFKA_READ_TIMEOUT = "job.kafkaJob.read.timeout";
+ public static final String JOB_KAFKA_AUTO_COMMIT_OFFSET_RESET = "job.kafkaJob.autoOffsetReset";
+
+ public static final String JOB_MONGO_HOSTS = "job.mongoJob.hosts";
+ public static final String JOB_MONGO_USER = "job.mongoJob.user";
+ public static final String JOB_MONGO_PASSWORD = "job.mongoJob.password";
+ public static final String JOB_MONGO_DATABASE_INCLUDE_LIST = "job.mongoJob.databaseIncludeList";
+ public static final String JOB_MONGO_DATABASE_EXCLUDE_LIST = "job.mongoJob.databaseExcludeList";
+ public static final String JOB_MONGO_COLLECTION_INCLUDE_LIST = "job.mongoJob.collectionIncludeList";
+ public static final String JOB_MONGO_COLLECTION_EXCLUDE_LIST = "job.mongoJob.collectionExcludeList";
+ public static final String JOB_MONGO_FIELD_EXCLUDE_LIST = "job.mongoJob.fieldExcludeList";
+ public static final String JOB_MONGO_SNAPSHOT_MODE = "job.mongoJob.snapshotMode";
+ public static final String JOB_MONGO_CAPTURE_MODE = "job.mongoJob.captureMode";
+ public static final String JOB_MONGO_QUEUE_SIZE = "job.mongoJob.queueSize";
+ public static final String JOB_MONGO_STORE_HISTORY_FILENAME = "job.mongoJob.history.filename";
+ public static final String JOB_MONGO_OFFSET_SPECIFIC_OFFSET_FILE = "job.mongoJob.offset.specificOffsetFile";
+ public static final String JOB_MONGO_OFFSET_SPECIFIC_OFFSET_POS = "job.mongoJob.offset.specificOffsetPos";
+ public static final String JOB_MONGO_OFFSETS = "job.mongoJob.offsets";
+ public static final String JOB_MONGO_CONNECT_TIMEOUT_MS = "job.mongoJob.connectTimeoutInMs";
+ public static final String JOB_MONGO_CURSOR_MAX_AWAIT = "job.mongoJob.cursorMaxAwaitTimeInMs";
+ public static final String JOB_MONGO_SOCKET_TIMEOUT = "job.mongoJob.socketTimeoutInMs";
+ public static final String JOB_MONGO_SELECTION_TIMEOUT = "job.mongoJob.selectionTimeoutInMs";
+ public static final String JOB_MONGO_FIELD_RENAMES = "job.mongoJob.fieldRenames";
+ public static final String JOB_MONGO_MEMBERS_DISCOVER = "job.mongoJob.membersAutoDiscover";
+ public static final String JOB_MONGO_CONNECT_MAX_ATTEMPTS = "job.mongoJob.connectMaxAttempts";
+ public static final String JOB_MONGO_BACKOFF_MAX_DELAY = "job.mongoJob.connectBackoffMaxDelayInMs";
+ public static final String JOB_MONGO_BACKOFF_INITIAL_DELAY = "job.mongoJob.connectBackoffInitialDelayInMs";
+ public static final String JOB_MONGO_INITIAL_SYNC_MAX_THREADS = "job.mongoJob.initialSyncMaxThreads";
+ public static final String JOB_MONGO_SSL_INVALID_HOSTNAME_ALLOWED = "job.mongoJob.sslInvalidHostnameAllowed";
+ public static final String JOB_MONGO_SSL_ENABLE = "job.mongoJob.sslEnabled";
+ public static final String JOB_MONGO_POLL_INTERVAL = "job.mongoJob.pollIntervalInMs";
+
+ public static final Long JOB_KAFKA_DEFAULT_OFFSET = 0L;
+
+ // job type, delete/add
+ public static final String JOB_TYPE = "job.type";
+
+ public static final String JOB_CHECKPOINT = "job.checkpoint";
+
+ public static final String DEFAULT_JOB_FILE_TIME_OFFSET = "0d";
+
+ // time in min
+ public static final int DEFAULT_JOB_FILE_MAX_WAIT_TIME = 1;
+
+ public static final String JOB_READ_WAIT_TIMEOUT = "job.file.read.wait";
+
+ public static final String JOB_ID_PREFIX = "job_";
+
+ public static final String SQL_JOB_ID = "sql_job_id";
+
+ public static final String JOB_STORE_TIME = "job.store.time";
+
+ public static final String JOB_OP = "job.op";
+
+ public static final String JOB_STATE = "job.state";
+
+ public static final String TASK_STATE = "task.state";
+
+ public static final String INSTANCE_STATE = "instance.state";
+
+ public static final String LAST_UPDATE_TIME = "lastUpdateTime";
+
+ public static final String TRIGGER_ONLY_ONE_JOB = "job.standalone"; // TODO:delete it
+
+ // field splitter
+ public static final String JOB_FIELD_SPLITTER = "job.splitter";
+
+ // job delivery time
+ public static final String JOB_DELIVERY_TIME = "job.deliveryTime";
+
+ // job time reading file
+ public static final String JOB_DATA_TIME = "job.dataTime";
+
+ // job of the number of seconds to wait before starting the task
+ public static final String JOB_TASK_BEGIN_WAIT_SECONDS = "job.taskWaitSeconds";
+
+ /**
+ * when job is retried, the retry time should be provided
+ */
+ public static final String JOB_RETRY_TIME = "job.retryTime";
+
+ /**
+ * delimiter to split offset for different task
+ */
+ public static final String JOB_OFFSET_DELIMITER = "_";
+
+ /**
+ * delimiter to split all partition offset for all kafka tasks
+ */
+ public static final String JOB_KAFKA_PARTITION_OFFSET_DELIMITER = "#";
+
+ /**
+ * sync send data when sending to DataProxy
+ */
+ public static final int SYNC_SEND_OPEN = 1;
+
+ public static final String INTERVAL_MILLISECONDS = "1000";
+
+ /**
+ * monitor switch, 1 true and 0 false
+ */
+ public static final String JOB_FILE_MONITOR_DEFAULT_STATUS = "1";
+
+ /**
+ * monitor expire time and the time in milliseconds.
+ * default value is -1 and stand for not expire time.
+ */
+ public static final String JOB_FILE_MONITOR_DEFAULT_EXPIRE = "-1";
+
+}
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/enums/InstanceStateEnum.java b/inlong-common/src/main/java/org/apache/inlong/common/enums/InstanceStateEnum.java
new file mode 100644
index 0000000000..e946c74f66
--- /dev/null
+++ b/inlong-common/src/main/java/org/apache/inlong/common/enums/InstanceStateEnum.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.common.enums;
+
+/**
+ * Enum of instance state.
+ */
+public enum InstanceStateEnum {
+
+ DEFAULT(0),
+ FINISHED(1),
+ DELETE(2);
+
+ private final int state;
+
+ InstanceStateEnum(int state) {
+ this.state = state;
+ }
+
+ public static InstanceStateEnum getTaskState(int state) {
+ switch (state) {
+ case 0:
+ return DEFAULT;
+ case 1:
+ return FINISHED;
+ case 2:
+ return DELETE;
+ default:
+ throw new RuntimeException("Unsupported instance state " + state);
+ }
+ }
+
+ public int getState() {
+ return state;
+ }
+
+}
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/enums/TaskStateEnum.java b/inlong-common/src/main/java/org/apache/inlong/common/enums/TaskStateEnum.java
new file mode 100644
index 0000000000..6401fa8ffb
--- /dev/null
+++ b/inlong-common/src/main/java/org/apache/inlong/common/enums/TaskStateEnum.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.common.enums;
+
+/**
+ * Enum of task state.
+ */
+public enum TaskStateEnum {
+
+ NEW(0),
+ RUNNING(1),
+ FROZEN(2),
+ FINISH(3);
+
+ private final int state;
+
+ TaskStateEnum(int state) {
+ this.state = state;
+ }
+
+ public static TaskStateEnum getTaskState(int state) {
+ switch (state) {
+ case 0:
+ return NEW;
+ case 1:
+ return RUNNING;
+ case 2:
+ return FROZEN;
+ case 3:
+ return FINISH;
+ default:
+ throw new RuntimeException("Unsupported task state " + state);
+ }
+ }
+
+ public int getType() {
+ return state;
+ }
+
+}