You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/11/05 14:19:33 UTC

[inlong] branch master updated (a3d9d597b -> d6874a8b8)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


    omit a3d9d597b [INLONG-6401][Sort] Bugfix:Schema update stuck in dead loop cause (#6407)
    omit d4d4dd5b4 [INLONG-6332][Agent] Fix reboot will reset file position error (#6333)
    omit 9228a22d6 [INLONG-6379][Sort] Bugfix:iceberg miss metric data in multiple sink (#6381)
     new 7617fa007 [INLONG-6379][Sort] Iceberg misses metric data in multiple sink scenes (#6381)
     new 1c790a399 [INLONG-6332][Agent] Fix reboot will reset file position error (#6333)
     new d6874a8b8 [INLONG-6401][Sort] Schema update causes stack overflow in multiple sink scenes (#6407)

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (a3d9d597b)
            \
             N -- N -- N   refs/heads/master (d6874a8b8)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:


[inlong] 03/03: [INLONG-6401][Sort] Schema update causes stack overflow in multiple sink scenes (#6407)

Posted by he...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit d6874a8b8c363a71d44182db626c62322ad02e41
Author: thesumery <10...@users.noreply.github.com>
AuthorDate: Sat Nov 5 15:57:58 2022 +0800

    [INLONG-6401][Sort] Schema update causes stack overflow in multiple sink scenes (#6407)
---
 .../sink/multiple/DynamicSchemaHandleOperator.java | 30 +++++++++++-----------
 1 file changed, 15 insertions(+), 15 deletions(-)

diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
index e7fe68127..a0a9092c6 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
@@ -229,10 +229,12 @@ public class DynamicSchemaHandleOperator extends AbstractStreamOperator<RecordWi
         Transaction transaction = table.newTransaction();
         if (table.schema().sameSchema(oldSchema)) {
             List<TableChange> tableChanges = SchemaChangeUtils.diffSchema(oldSchema, newSchema);
-            if (canHandleWithSchemaUpdatePolicy(tableId, tableChanges)) {
-                SchemaChangeUtils.applySchemaChanges(transaction.updateSchema(), tableChanges);
-                LOG.info("Schema evolution in table({}) for table change: {}", tableId, tableChanges);
+            if (!canHandleWithSchemaUpdatePolicy(tableId, tableChanges)) {
+                // If can not handle this schema update, should not push data into next operator
+                return;
             }
+            SchemaChangeUtils.applySchemaChanges(transaction.updateSchema(), tableChanges);
+            LOG.info("Schema evolution in table({}) for table change: {}", tableId, tableChanges);
         }
         transaction.commitTransaction();
         handleSchemaInfoEvent(tableId, table.schema());
@@ -270,22 +272,20 @@ public class DynamicSchemaHandleOperator extends AbstractStreamOperator<RecordWi
     private boolean canHandleWithSchemaUpdatePolicy(TableIdentifier tableId, List<TableChange> tableChanges) {
         boolean canHandle = true;
         for (TableChange tableChange : tableChanges) {
-            if (tableChange instanceof AddColumn) {
-                canHandle &= MultipleSinkOption.canHandleWithSchemaUpdate(tableId.toString(), tableChange,
-                        multipleSinkOption.getSchemaUpdatePolicy());
-            } else {
-                if (MultipleSinkOption.canHandleWithSchemaUpdate(tableId.toString(), tableChange,
-                        multipleSinkOption.getSchemaUpdatePolicy())) {
-                    LOG.info("Ignore table {} schema change: {} because iceberg can't handle it.",
-                            tableId, tableChange);
-                }
+            canHandle &= MultipleSinkOption.canHandleWithSchemaUpdate(tableId.toString(), tableChange,
+                    multipleSinkOption.getSchemaUpdatePolicy());
+            if (!(tableChange instanceof AddColumn)) {
                 // todo:currently iceberg can only handle addColumn, so always return false
+                LOG.info("Ignore table {} schema change: {} because iceberg can't handle it.",
+                        tableId, tableChange);
                 canHandle = false;
             }
+            if (!canHandle) {
+                blacklist.add(tableId);
+                break;
+            }
         }
-        if (!canHandle) {
-            blacklist.add(tableId);
-        }
+
         return canHandle;
     }
 }


[inlong] 01/03: [INLONG-6379][Sort] Iceberg misses metric data in multiple sink scenes (#6381)

Posted by he...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit 7617fa00748b5e9d8fa1d6a7998885f461e91b60
Author: thesumery <10...@users.noreply.github.com>
AuthorDate: Fri Nov 4 16:34:50 2022 +0800

    [INLONG-6379][Sort] Iceberg misses metric data in multiple sink scenes (#6381)
---
 .../apache/inlong/sort/iceberg/sink/FlinkSink.java |  3 +-
 .../sink/multiple/IcebergMultipleStreamWriter.java | 59 +++++++++++++++++++++-
 2 files changed, 60 insertions(+), 2 deletions(-)

diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
index bb7498650..25f9e963b 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
@@ -528,7 +528,8 @@ public class FlinkSink {
                     .setParallelism(parallelism);
 
             IcebergProcessOperator streamWriter =
-                    new IcebergProcessOperator(new IcebergMultipleStreamWriter(appendMode, catalogLoader));
+                    new IcebergProcessOperator(new IcebergMultipleStreamWriter(
+                            appendMode, catalogLoader, inlongMetric, auditHostAndPorts));
             SingleOutputStreamOperator<MultipleWriteResult> writerStream = routeStream
                     .transform(operatorName(ICEBERG_MULTIPLE_STREAM_WRITER_NAME),
                             TypeInformation.of(IcebergProcessOperator.class),
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java
index 4c3fb0045..617eb6d69 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java
@@ -18,6 +18,10 @@
 
 package org.apache.inlong.sort.iceberg.sink.multiple;
 
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
@@ -34,10 +38,16 @@ import org.apache.iceberg.flink.FlinkSchemaUtil;
 import org.apache.iceberg.flink.sink.TaskWriterFactory;
 import org.apache.iceberg.types.Types.NestedField;
 import org.apache.iceberg.util.PropertyUtil;
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
+import org.apache.inlong.sort.base.metric.MetricState;
+import org.apache.inlong.sort.base.metric.SinkMetricData;
+import org.apache.inlong.sort.base.util.MetricStateUtils;
 import org.apache.inlong.sort.iceberg.sink.RowDataTaskWriterFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
 import java.io.Closeable;
 import java.util.HashMap;
 import java.util.List;
@@ -52,6 +62,9 @@ import static org.apache.iceberg.TableProperties.UPSERT_ENABLED;
 import static org.apache.iceberg.TableProperties.UPSERT_ENABLED_DEFAULT;
 import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES;
 import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT;
+import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME;
+import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT;
+import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT;
 
 /**
  * Iceberg writer that can distinguish different sink tables and route and distribute data into different
@@ -70,9 +83,23 @@ public class IcebergMultipleStreamWriter extends IcebergProcessFunction<RecordWi
     private transient Map<TableIdentifier, Schema> multipleSchemas;
     private transient FunctionInitializationContext functionInitializationContext;
 
-    public IcebergMultipleStreamWriter(boolean appendMode, CatalogLoader catalogLoader) {
+    // metric
+    private final String inlongMetric;
+    private final String auditHostAndPorts;
+    @Nullable
+    private transient SinkMetricData metricData;
+    private transient ListState<MetricState> metricStateListState;
+    private transient MetricState metricState;
+
+    public IcebergMultipleStreamWriter(
+            boolean appendMode,
+            CatalogLoader catalogLoader,
+            String inlongMetric,
+            String auditHostAndPorts) {
         this.appendMode = appendMode;
         this.catalogLoader = catalogLoader;
+        this.inlongMetric = inlongMetric;
+        this.auditHostAndPorts = auditHostAndPorts;
     }
 
     @Override
@@ -81,6 +108,18 @@ public class IcebergMultipleStreamWriter extends IcebergProcessFunction<RecordWi
         this.multipleWriters = new HashMap<>();
         this.multipleTables = new HashMap<>();
         this.multipleSchemas = new HashMap<>();
+
+        // Initialize metric
+        MetricOption metricOption = MetricOption.builder()
+                .withInlongLabels(inlongMetric)
+                .withInlongAudit(auditHostAndPorts)
+                .withInitRecords(metricState != null ? metricState.getMetricValue(NUM_RECORDS_OUT) : 0L)
+                .withInitBytes(metricState != null ? metricState.getMetricValue(NUM_BYTES_OUT) : 0L)
+                .withRegisterMetric(RegisteredMetric.ALL)
+                .build();
+        if (metricOption != null) {
+            metricData = new SinkMetricData(metricOption, getRuntimeContext().getMetricGroup());
+        }
     }
 
     @Override
@@ -185,11 +224,29 @@ public class IcebergMultipleStreamWriter extends IcebergProcessFunction<RecordWi
         for (Entry<TableIdentifier, IcebergSingleStreamWriter<RowData>> entry: multipleWriters.entrySet()) {
             entry.getValue().snapshotState(context);
         }
+
+        // metric
+        if (metricData != null && metricStateListState != null) {
+            MetricStateUtils.snapshotMetricStateForSinkMetricData(metricStateListState, metricData,
+                    getRuntimeContext().getIndexOfThisSubtask());
+        }
     }
 
     @Override
     public void initializeState(FunctionInitializationContext context) throws Exception {
         this.functionInitializationContext = context;
+
+        // init metric state
+        if (this.inlongMetric != null) {
+            this.metricStateListState = context.getOperatorStateStore().getUnionListState(
+                    new ListStateDescriptor<>(
+                            INLONG_METRIC_STATE_NAME, TypeInformation.of(new TypeHint<MetricState>() {
+                    })));
+        }
+        if (context.isRestored()) {
+            metricState = MetricStateUtils.restoreMetricState(metricStateListState,
+                    getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks());
+        }
     }
 
     private boolean isSchemaUpdate(RecordWithSchema recordWithSchema) {


[inlong] 02/03: [INLONG-6332][Agent] Fix reboot will reset file position error (#6333)

Posted by he...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit 1c790a399ea7bb9e964d8215e7d65584aba53705
Author: ganfengtan <Ga...@users.noreply.github.com>
AuthorDate: Fri Nov 4 21:18:58 2022 +0800

    [INLONG-6332][Agent] Fix reboot will reset file position error (#6333)
    
    Co-authored-by: healchow <he...@gmail.com>
---
 .../agent/core/task/TaskPositionManager.java       |   2 +
 .../sources/reader/file/MonitorTextFile.java       | 101 ++++++++++++---------
 2 files changed, 60 insertions(+), 43 deletions(-)

diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskPositionManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskPositionManager.java
index c3bfa0abe..c5b944210 100644
--- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskPositionManager.java
+++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskPositionManager.java
@@ -139,6 +139,8 @@ public class TaskPositionManager extends AbstractDaemon {
         ConcurrentHashMap<String, Long> positionTemp = new ConcurrentHashMap<>();
         ConcurrentHashMap<String, Long> position = jobTaskPositionMap.putIfAbsent(jobInstanceId, positionTemp);
         if (position == null) {
+            JobProfile jobProfile = jobConfDb.getJobById(jobInstanceId);
+            positionTemp.put(sourcePath, jobProfile.getLong(sourcePath + POSITION_SUFFIX, 0));
             position = positionTemp;
         }
         Long beforePosition = position.getOrDefault(sourcePath, 0L);
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/MonitorTextFile.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/MonitorTextFile.java
index d8160bd8b..1f5958cdf 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/MonitorTextFile.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/MonitorTextFile.java
@@ -21,10 +21,10 @@ import org.apache.inlong.agent.common.AgentThreadFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.attribute.BasicFileAttributes;
-import java.util.Objects;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -35,26 +35,28 @@ import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_MONITOR_EXP
 import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_MONITOR_INTERVAL;
 
 /**
- * monitor files
+ * Monitor for text files
  */
 public final class MonitorTextFile {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(MonitorTextFile.class);
-    private static volatile MonitorTextFile monitorTextFile = null;
     // monitor thread pool
-    private final ThreadPoolExecutor runningPool = new ThreadPoolExecutor(
+    private static final ThreadPoolExecutor EXECUTOR_SERVICE = new ThreadPoolExecutor(
             0, Integer.MAX_VALUE,
             60L, TimeUnit.SECONDS,
             new SynchronousQueue<>(),
             new AgentThreadFactory("monitor-file"));
 
+    private static volatile MonitorTextFile monitorTextFile = null;
+
     private MonitorTextFile() {
 
     }
 
     /**
-     * Mode of singleton
-     * @return MonitorTextFile instance
+     * Get a singleton instance of MonitorTextFile.
+     *
+     * @return monitor text file instance
      */
     public static MonitorTextFile getInstance() {
         if (monitorTextFile == null) {
@@ -68,37 +70,35 @@ public final class MonitorTextFile {
     }
 
     public void monitor(FileReaderOperator fileReaderOperator, TextFileReader textFileReader) {
-        MonitorEventRunnable monitorEvent = new MonitorEventRunnable(fileReaderOperator, textFileReader);
-        runningPool.execute(monitorEvent);
+        EXECUTOR_SERVICE.execute(new MonitorEventRunnable(fileReaderOperator, textFileReader));
     }
 
     /**
-     * monitor file event
+     * Runnable for monitor the file event
      */
-    private class MonitorEventRunnable implements Runnable {
+    private static class MonitorEventRunnable implements Runnable {
 
         private static final int WAIT_TIME = 30;
         private final FileReaderOperator fileReaderOperator;
         private final TextFileReader textFileReader;
         private final Long interval;
         private final long startTime = System.currentTimeMillis();
+        private long lastFlushTime = System.currentTimeMillis();
         private String path;
-        /**
-         * the last modify time of the file
-         */
+
+        // the last modify time of the file
         private BasicFileAttributes attributesBefore;
 
-        public MonitorEventRunnable(FileReaderOperator fileReaderOperator, TextFileReader textFileReader) {
-            this.fileReaderOperator = fileReaderOperator;
+        public MonitorEventRunnable(FileReaderOperator readerOperator, TextFileReader textFileReader) {
+            this.fileReaderOperator = readerOperator;
             this.textFileReader = textFileReader;
-            this.interval = Long
-                    .parseLong(fileReaderOperator.jobConf.get(JOB_FILE_MONITOR_INTERVAL, INTERVAL_MILLISECONDS));
+            this.interval = Long.parseLong(
+                    readerOperator.jobConf.get(JOB_FILE_MONITOR_INTERVAL, INTERVAL_MILLISECONDS));
             try {
-                this.attributesBefore = Files
-                        .readAttributes(fileReaderOperator.file.toPath(), BasicFileAttributes.class);
-                this.path = this.fileReaderOperator.file.getCanonicalPath();
+                this.attributesBefore = Files.readAttributes(readerOperator.file.toPath(), BasicFileAttributes.class);
+                this.path = readerOperator.file.getCanonicalPath();
             } catch (IOException e) {
-                LOGGER.error("get {} last modify time error:", fileReaderOperator.file.getName(), e);
+                LOGGER.error("get {} last modify time error:", readerOperator.file.getName(), e);
             }
         }
 
@@ -106,10 +106,10 @@ public final class MonitorTextFile {
         public void run() {
             try {
                 TimeUnit.SECONDS.sleep(WAIT_TIME);
-                LOGGER.info("start {} monitor", this.fileReaderOperator.file.getAbsolutePath());
-                while (!this.fileReaderOperator.finished) {
-                    long expireTime = Long.parseLong(fileReaderOperator.jobConf
-                            .get(JOB_FILE_MONITOR_EXPIRE, JOB_FILE_MONITOR_DEFAULT_EXPIRE));
+                LOGGER.info("start {} monitor", fileReaderOperator.file.getAbsolutePath());
+                while (!fileReaderOperator.finished) {
+                    long expireTime = Long.parseLong(
+                            fileReaderOperator.jobConf.get(JOB_FILE_MONITOR_EXPIRE, JOB_FILE_MONITOR_DEFAULT_EXPIRE));
                     long currentTime = System.currentTimeMillis();
                     if (expireTime != Long.parseLong(JOB_FILE_MONITOR_DEFAULT_EXPIRE)
                             && currentTime - this.startTime > expireTime) {
@@ -119,39 +119,54 @@ public final class MonitorTextFile {
                     TimeUnit.MILLISECONDS.sleep(interval);
                 }
             } catch (Exception e) {
-                LOGGER.error("monitor {} error:", this.fileReaderOperator.file.getName(), e);
+                LOGGER.error(String.format("monitor %s error", fileReaderOperator.file.getName()), e);
             }
         }
 
         private void listen() throws IOException {
             BasicFileAttributes attributesAfter;
             String currentPath;
+            File file = fileReaderOperator.file;
             try {
-                attributesAfter = Files
-                        .readAttributes(this.fileReaderOperator.file.toPath(), BasicFileAttributes.class);
-                currentPath = this.fileReaderOperator.file.getCanonicalPath();
+                attributesAfter = Files.readAttributes(file.toPath(), BasicFileAttributes.class);
+                currentPath = file.getCanonicalPath();
             } catch (Exception e) {
-                // Set position 0 when split file
-                this.fileReaderOperator.position = 0;
-                LOGGER.error("monitor {} error, reset position is 0:", this.fileReaderOperator.file.getName(), e);
+                // set position 0 when split file
+                fileReaderOperator.position = 0;
+                LOGGER.error(String.format("monitor file %s error, reset position to 0", file.getName()), e);
                 return;
             }
-            // If change symbolic links
+
+            // if change symbolic links
             if (attributesAfter.isSymbolicLink() && !path.equals(currentPath)) {
-                this.fileReaderOperator.position = 0;
+                fileReaderOperator.position = 0;
                 path = currentPath;
             }
             if (attributesBefore.lastModifiedTime().compareTo(attributesAfter.lastModifiedTime()) < 0) {
-                // Not triggered during data sending
-                if (Objects.nonNull(this.fileReaderOperator.iterator) && this.fileReaderOperator.iterator
-                        .hasNext()) {
-                    return;
-                }
-                this.textFileReader.getData();
-                this.textFileReader.mergeData(this.fileReaderOperator);
-                this.attributesBefore = attributesAfter;
-                this.fileReaderOperator.iterator = fileReaderOperator.stream.iterator();
+                // not triggered during data sending
+                getFileData();
+                attributesBefore = attributesAfter;
+                return;
+            }
+            lastFlushData();
+        }
+
+        private void lastFlushData() throws IOException {
+            long currentTime = System.currentTimeMillis();
+            if (interval * 100 > currentTime - lastFlushTime) {
+                return;
+            }
+            getFileData();
+        }
+
+        private void getFileData() throws IOException {
+            if (fileReaderOperator.iterator != null && fileReaderOperator.iterator.hasNext()) {
+                return;
             }
+            this.textFileReader.getData();
+            this.textFileReader.mergeData(this.fileReaderOperator);
+            this.fileReaderOperator.iterator = fileReaderOperator.stream.iterator();
+            this.lastFlushTime = System.currentTimeMillis();
         }
     }
 }