You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2023/10/24 04:49:12 UTC

[inlong] branch master updated: [INLONG-9089][Agent] Add enums for task and instance (#9090)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new e5ba995f75 [INLONG-9089][Agent] Add enums for task and instance (#9090)
e5ba995f75 is described below

commit e5ba995f754af402f2c915c40ac68b8f40a6ccdc
Author: justinwwhuang <hw...@163.com>
AuthorDate: Mon Oct 23 23:49:06 2023 -0500

    [INLONG-9089][Agent] Add enums for task and instance (#9090)
---
 .../inlong/agent/constant/TaskConstants.java       | 218 +++++++++++++++++++++
 .../inlong/common/enums/InstanceStateEnum.java     |  52 +++++
 .../apache/inlong/common/enums/TaskStateEnum.java  |  55 ++++++
 3 files changed, 325 insertions(+)

diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
new file mode 100755
index 0000000000..bbf95cea76
--- /dev/null
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.constant;
+
+/**
+ * Basic config for a single job
+ */
+public class TaskConstants extends CommonConstants {
+
+    // job id
+    // public static final String JOB_ID = "job.id";
+    public static final String TASK_ID = "task.id";
+    public static final String INSTANCE_ID = "instance.id";
+    public static final String JOB_INSTANCE_ID = "job.instance.id";
+    public static final String INSTANCE_CREATE_TIME = "instance.createTime";
+    public static final String INSTANCE_MODIFY_TIME = "instance.modifyTime";
+    public static final String JOB_IP = "job.ip";
+    public static final String JOB_RETRY = "job.retry";
+    public static final String JOB_UUID = "job.uuid";
+    public static final String TASK_GROUP_ID = "task.groupId";
+    public static final String TASK_STREAM_ID = "task.streamId";
+
+    public static final String TASK_SOURCE = "task.source";
+    public static final String JOB_SOURCE_TYPE = "job.sourceType";
+
+    public static final String TASK_CHANNEL = "task.channel";
+    public static final String JOB_NAME = "job.name";
+    public static final String JOB_LINE_FILTER_PATTERN = "job.pattern";
+
+    public static final String DEFAULT_JOB_NAME = "default";
+    public static final String JOB_DESCRIPTION = "job.description";
+    public static final String DEFAULT_JOB_DESCRIPTION = "default job description";
+    public static final String DEFAULT_JOB_LINE_FILTER = "";
+    public static final String TASK_CLASS = "task.taskClass";
+    public static final String INSTANCE_CLASS = "task.instance.class";
+    public static final String JOB_FILE_TRIGGER = "job.fileTask.trigger";
+
+    // sink config
+    public static final String TASK_SINK = "task.sink";
+    public static final String JOB_PROXY_SEND = "job.proxySend";
+    public static final boolean DEFAULT_JOB_PROXY_SEND = false;
+    public static final String JOB_MQ_ClUSTERS = "job.mqClusters";
+    public static final String JOB_MQ_TOPIC = "job.topicInfo";
+    public static final String OFFSET = "offset";
+    public static final Long DEFAULT_OFFSET = -1L;
+    public static final String INODE_INFO = "inodeInfo";
+
+    // File job
+    public static final String TASK_DIR_FILTER_PATTERN = "task.fileTask.dir.pattern"; // deprecated
+    public static final String FILE_DIR_FILTER_PATTERNS = "task.fileTask.dir.patterns";
+    public static final String TASK_FILE_TIME_OFFSET = "task.fileTask.timeOffset";
+    public static final String TASK_FILE_MAX_WAIT = "task.fileTask.file.max.wait";
+    public static final String TASK_CYCLE_UNIT = "task.fileTask.cycleUnit";
+    public static final String TASK_FILE_TRIGGER_TYPE = "task.fileTask.collectType";
+    public static final String JOB_FILE_LINE_END_PATTERN = "job.fileTask.line.endPattern";
+    public static final String JOB_FILE_CONTENT_COLLECT_TYPE = "job.fileTask.contentCollectType";
+    public static final String JOB_FILE_META_ENV_LIST = "job.fileTask.envList";
+    public static final String JOB_FILE_META_FILTER_BY_LABELS = "job.fileTask.filterMetaByLabels";
+    public static final String JOB_FILE_PROPERTIES = "job.fileTask.properties";
+    public static final String JOB_FILE_DATA_SOURCE_COLUMN_SEPARATOR = "job.fileTask.dataSeparator";
+    public static final String JOB_FILE_MONITOR_INTERVAL = "job.fileTask.monitorInterval";
+    public static final String JOB_FILE_MONITOR_STATUS = "job.fileTask.monitorStatus";
+    public static final String JOB_FILE_MONITOR_EXPIRE = "job.fileTask.monitorExpire";
+    public static final String TASK_RETRY = "task.fileTask.retry";
+    public static final String TASK_START_TIME = "task.fileTask.startTime";
+    public static final String TASK_END_TIME = "task.fileTask.endTime";
+    public static final String FILE_MAX_NUM = "task.fileTask.maxFileCount";
+
+    // Binlog job
+    public static final String JOB_DATABASE_USER = "job.binlogJob.user";
+    public static final String JOB_DATABASE_PASSWORD = "job.binlogJob.password";
+    public static final String JOB_DATABASE_HOSTNAME = "job.binlogJob.hostname";
+    public static final String JOB_TABLE_WHITELIST = "job.binlogJob.tableWhiteList";
+    public static final String JOB_DATABASE_WHITELIST = "job.binlogJob.databaseWhiteList";
+    public static final String JOB_DATABASE_OFFSETS = "job.binlogJob.offsets";
+    public static final String JOB_DATABASE_OFFSET_FILENAME = "job.binlogJob.offset.filename";
+
+    public static final String JOB_DATABASE_SERVER_TIME_ZONE = "job.binlogJob.serverTimezone";
+    public static final String JOB_DATABASE_STORE_OFFSET_INTERVAL_MS = "job.binlogJob.offset.intervalMs";
+
+    public static final String JOB_DATABASE_STORE_HISTORY_FILENAME = "job.binlogJob.history.filename";
+    public static final String JOB_DATABASE_INCLUDE_SCHEMA_CHANGES = "job.binlogJob.schema";
+    public static final String JOB_DATABASE_SNAPSHOT_MODE = "job.binlogJob.snapshot.mode";
+    public static final String JOB_DATABASE_HISTORY_MONITOR_DDL = "job.binlogJob.ddl";
+    public static final String JOB_DATABASE_PORT = "job.binlogJob.port";
+
+    // Kafka job
+    public static final String JOB_KAFKA_TOPIC = "job.kafkaJob.topic";
+    public static final String JOB_KAFKA_BOOTSTRAP_SERVERS = "job.kafkaJob.bootstrap.servers";
+    public static final String JOB_KAFKA_GROUP_ID = "job.kafkaJob.group.id";
+    public static final String JOB_KAFKA_RECORD_SPEED_LIMIT = "job.kafkaJob.recordSpeed.limit";
+    public static final String JOB_KAFKA_BYTE_SPEED_LIMIT = "job.kafkaJob.byteSpeed.limit";
+    public static final String JOB_KAFKA_OFFSET = "job.kafkaJob.partition.offset";
+    public static final String JOB_KAFKA_READ_TIMEOUT = "job.kafkaJob.read.timeout";
+    public static final String JOB_KAFKA_AUTO_COMMIT_OFFSET_RESET = "job.kafkaJob.autoOffsetReset";
+
+    public static final String JOB_MONGO_HOSTS = "job.mongoJob.hosts";
+    public static final String JOB_MONGO_USER = "job.mongoJob.user";
+    public static final String JOB_MONGO_PASSWORD = "job.mongoJob.password";
+    public static final String JOB_MONGO_DATABASE_INCLUDE_LIST = "job.mongoJob.databaseIncludeList";
+    public static final String JOB_MONGO_DATABASE_EXCLUDE_LIST = "job.mongoJob.databaseExcludeList";
+    public static final String JOB_MONGO_COLLECTION_INCLUDE_LIST = "job.mongoJob.collectionIncludeList";
+    public static final String JOB_MONGO_COLLECTION_EXCLUDE_LIST = "job.mongoJob.collectionExcludeList";
+    public static final String JOB_MONGO_FIELD_EXCLUDE_LIST = "job.mongoJob.fieldExcludeList";
+    public static final String JOB_MONGO_SNAPSHOT_MODE = "job.mongoJob.snapshotMode";
+    public static final String JOB_MONGO_CAPTURE_MODE = "job.mongoJob.captureMode";
+    public static final String JOB_MONGO_QUEUE_SIZE = "job.mongoJob.queueSize";
+    public static final String JOB_MONGO_STORE_HISTORY_FILENAME = "job.mongoJob.history.filename";
+    public static final String JOB_MONGO_OFFSET_SPECIFIC_OFFSET_FILE = "job.mongoJob.offset.specificOffsetFile";
+    public static final String JOB_MONGO_OFFSET_SPECIFIC_OFFSET_POS = "job.mongoJob.offset.specificOffsetPos";
+    public static final String JOB_MONGO_OFFSETS = "job.mongoJob.offsets";
+    public static final String JOB_MONGO_CONNECT_TIMEOUT_MS = "job.mongoJob.connectTimeoutInMs";
+    public static final String JOB_MONGO_CURSOR_MAX_AWAIT = "job.mongoJob.cursorMaxAwaitTimeInMs";
+    public static final String JOB_MONGO_SOCKET_TIMEOUT = "job.mongoJob.socketTimeoutInMs";
+    public static final String JOB_MONGO_SELECTION_TIMEOUT = "job.mongoJob.selectionTimeoutInMs";
+    public static final String JOB_MONGO_FIELD_RENAMES = "job.mongoJob.fieldRenames";
+    public static final String JOB_MONGO_MEMBERS_DISCOVER = "job.mongoJob.membersAutoDiscover";
+    public static final String JOB_MONGO_CONNECT_MAX_ATTEMPTS = "job.mongoJob.connectMaxAttempts";
+    public static final String JOB_MONGO_BACKOFF_MAX_DELAY = "job.mongoJob.connectBackoffMaxDelayInMs";
+    public static final String JOB_MONGO_BACKOFF_INITIAL_DELAY = "job.mongoJob.connectBackoffInitialDelayInMs";
+    public static final String JOB_MONGO_INITIAL_SYNC_MAX_THREADS = "job.mongoJob.initialSyncMaxThreads";
+    public static final String JOB_MONGO_SSL_INVALID_HOSTNAME_ALLOWED = "job.mongoJob.sslInvalidHostnameAllowed";
+    public static final String JOB_MONGO_SSL_ENABLE = "job.mongoJob.sslEnabled";
+    public static final String JOB_MONGO_POLL_INTERVAL = "job.mongoJob.pollIntervalInMs";
+
+    public static final Long JOB_KAFKA_DEFAULT_OFFSET = 0L;
+
+    // job type, delete/add
+    public static final String JOB_TYPE = "job.type";
+
+    public static final String JOB_CHECKPOINT = "job.checkpoint";
+
+    public static final String DEFAULT_JOB_FILE_TIME_OFFSET = "0d";
+
+    // time in min
+    public static final int DEFAULT_JOB_FILE_MAX_WAIT_TIME = 1;
+
+    public static final String JOB_READ_WAIT_TIMEOUT = "job.file.read.wait";
+
+    public static final String JOB_ID_PREFIX = "job_";
+
+    public static final String SQL_JOB_ID = "sql_job_id";
+
+    public static final String JOB_STORE_TIME = "job.store.time";
+
+    public static final String JOB_OP = "job.op";
+
+    public static final String JOB_STATE = "job.state";
+
+    public static final String TASK_STATE = "task.state";
+
+    public static final String INSTANCE_STATE = "instance.state";
+
+    public static final String LAST_UPDATE_TIME = "lastUpdateTime";
+
+    public static final String TRIGGER_ONLY_ONE_JOB = "job.standalone"; // TODO:delete it
+
+    // field splitter
+    public static final String JOB_FIELD_SPLITTER = "job.splitter";
+
+    // job delivery time
+    public static final String JOB_DELIVERY_TIME = "job.deliveryTime";
+
+    // job time reading file
+    public static final String JOB_DATA_TIME = "job.dataTime";
+
+    // job of the number of seconds to wait before starting the task
+    public static final String JOB_TASK_BEGIN_WAIT_SECONDS = "job.taskWaitSeconds";
+
+    /**
+     * when job is retried, the retry time should be provided
+     */
+    public static final String JOB_RETRY_TIME = "job.retryTime";
+
+    /**
+     * delimiter to split offset for different task
+     */
+    public static final String JOB_OFFSET_DELIMITER = "_";
+
+    /**
+     * delimiter to split all partition offset for all kafka tasks
+     */
+    public static final String JOB_KAFKA_PARTITION_OFFSET_DELIMITER = "#";
+
+    /**
+     * sync send data when sending to DataProxy
+     */
+    public static final int SYNC_SEND_OPEN = 1;
+
+    public static final String INTERVAL_MILLISECONDS = "1000";
+
+    /**
+     * monitor switch, 1 true and 0 false
+     */
+    public static final String JOB_FILE_MONITOR_DEFAULT_STATUS = "1";
+
+    /**
+     * monitor expire time and the time in milliseconds.
+     * default value is -1 and stand for not expire time.
+     */
+    public static final String JOB_FILE_MONITOR_DEFAULT_EXPIRE = "-1";
+
+}
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/enums/InstanceStateEnum.java b/inlong-common/src/main/java/org/apache/inlong/common/enums/InstanceStateEnum.java
new file mode 100644
index 0000000000..e946c74f66
--- /dev/null
+++ b/inlong-common/src/main/java/org/apache/inlong/common/enums/InstanceStateEnum.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.common.enums;
+
+/**
+ * Enum of instance state.
+ */
+public enum InstanceStateEnum {
+
+    DEFAULT(0),
+    FINISHED(1),
+    DELETE(2);
+
+    private final int state;
+
+    InstanceStateEnum(int state) {
+        this.state = state;
+    }
+
+    public static InstanceStateEnum getTaskState(int state) {
+        switch (state) {
+            case 0:
+                return DEFAULT;
+            case 1:
+                return FINISHED;
+            case 2:
+                return DELETE;
+            default:
+                throw new RuntimeException("Unsupported instance state " + state);
+        }
+    }
+
+    public int getState() {
+        return state;
+    }
+
+}
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/enums/TaskStateEnum.java b/inlong-common/src/main/java/org/apache/inlong/common/enums/TaskStateEnum.java
new file mode 100644
index 0000000000..6401fa8ffb
--- /dev/null
+++ b/inlong-common/src/main/java/org/apache/inlong/common/enums/TaskStateEnum.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.common.enums;
+
+/**
+ * Enum of task state.
+ */
+public enum TaskStateEnum {
+
+    NEW(0),
+    RUNNING(1),
+    FROZEN(2),
+    FINISH(3);
+
+    private final int state;
+
+    TaskStateEnum(int state) {
+        this.state = state;
+    }
+
+    public static TaskStateEnum getTaskState(int state) {
+        switch (state) {
+            case 0:
+                return NEW;
+            case 1:
+                return RUNNING;
+            case 2:
+                return FROZEN;
+            case 3:
+                return FINISH;
+            default:
+                throw new RuntimeException("Unsupported task state " + state);
+        }
+    }
+
+    public int getType() {
+        return state;
+    }
+
+}