You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by GitBox <gi...@apache.org> on 2022/11/28 11:59:22 UTC

[GitHub] [inlong] yunqingmoswu opened a new pull request, #6655: [INLONG-6654][Sort] Supports s3 side-output for dirty data

yunqingmoswu opened a new pull request, #6655:
URL: https://github.com/apache/inlong/pull/6655

   ### Prepare a Pull Request
   *(Change the title refer to the following example)*
   
   - Title: [INLONG-6654][Sort] Supports s3 side-output for dirty data
   
   *(The following *XYZ* should be replaced by the actual [GitHub Issue](https://github.com/apache/inlong/issues) number)*
   
   - Fixes #6654
   
   ### Motivation
   
   Supports s3 side-output for dirty data, through it, dirty data can be archived to s3.
    This piece is designed as follows(some common process of dirty data side-output can view https://github.com/apache/inlong/pull/6618):
   1. Add s3 side-output implements
       In this part, i will add dirty data cache and flush to s3  periodically.
   2. Add s3 side-output factory to create object of s3 side-output
   
   ### Modifications
   
   1.Add the core class S3DirtySink for dirty data side-output
   2.Add S3DirtySinkFactory to create S3DirtySink
   3.Add S3Options to config S3DirtySink
   4.Add S3Helper to help side-output to s3
   
   ### Verifying this change
   
   *(Please pick either of the following options)*
   
   - [ ] This change is a trivial rework/code cleanup without any test coverage.
   
   - [x] This change is already covered by existing tests, such as:
     *(please describe tests)*
   
   - [ ] This change added tests and can be verified as follows:
   
     *(example:)*
     - *Added integration tests for end-to-end deployment with large payloads (10MB)*
     - *Extended integration test for recovery after broker failure*
   
   ### Documentation
   
     - Does this pull request introduce a new feature? (yes / no)
     - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)
     - If a feature is not applicable for documentation, explain why?
     - If a feature is not documented yet in this PR, please create a follow-up issue for adding the documentation
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] yunqingmoswu commented on pull request #6655: [INLONG-6654][Sort] Supports s3 side-output for dirty data

Posted by GitBox <gi...@apache.org>.
yunqingmoswu commented on PR #6655:
URL: https://github.com/apache/inlong/pull/6655#issuecomment-1329987333

   > I think that logdirtysink and s3dirtysink should be refactored to somewhere other than sort-base because essentially they are two sink factories. two questions: 1) will inlong manager need to change to adapt to these changes? 2) when are these sinks created? in other sinks' connectors? when manager parses the parameters? or when the task is created?
   > 
   > consider adding some unit tests to make these dirty sink uses cases a bit more clear?
   
   How to use it has been explained in the 'log' pr, this piece cannot be added to ut at present, because the dirty data output is bypassed and nested in each connector.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] yunqingmoswu commented on a diff in pull request #6655: [INLONG-6654][Sort] Supports s3 side-output for dirty data

Posted by GitBox <gi...@apache.org>.
yunqingmoswu commented on code in PR #6655:
URL: https://github.com/apache/inlong/pull/6655#discussion_r1034372634


##########
inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3DirtySinkFactory.java:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.dirty.sink.s3;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.inlong.sort.base.dirty.sink.DirtySink;
+import org.apache.inlong.sort.base.dirty.sink.DirtySinkFactory;
+
+import java.util.HashSet;
+import java.util.Set;
+import static org.apache.inlong.sort.base.Constants.DIRTY_IDENTIFIER;
+import static org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_BATCH_BYTES;
+import static org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_BATCH_INTERVAL;
+import static org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_BATCH_SIZE;
+import static org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_FIELD_DELIMITER;
+import static org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_FORMAT;
+import static org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_IGNORE_ERRORS;
+import static org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_LINE_DELIMITER;
+import static org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_LOG_ENABLE;
+import static org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_RETRIES;
+
+/**
+ * S3 dirty sink factory
+ */
+public class S3DirtySinkFactory implements DirtySinkFactory {
+
+    private static final String IDENTIFIER = "s3";

Review Comment:
   I removed the integrated DynamicTableFactory here and it won't conflict with other connectors.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] yunqingmoswu merged pull request #6655: [INLONG-6654][Sort] Supports s3 side-output for dirty data

Posted by GitBox <gi...@apache.org>.
yunqingmoswu merged PR #6655:
URL: https://github.com/apache/inlong/pull/6655


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] EMsnap commented on a diff in pull request #6655: [INLONG-6654][Sort] Supports s3 side-output for dirty data

Posted by GitBox <gi...@apache.org>.
EMsnap commented on code in PR #6655:
URL: https://github.com/apache/inlong/pull/6655#discussion_r1034350724


##########
inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3DirtySink.java:
##########
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.dirty.sink.s3;
+
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.client.builder.AwsClientBuilder;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3ClientBuilder;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.formats.common.TimestampFormat;
+import org.apache.flink.formats.json.JsonOptions.MapNullKeyMode;
+import org.apache.flink.formats.json.RowDataToJsonConverters;
+import org.apache.flink.formats.json.RowDataToJsonConverters.RowDataToJsonConverter;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.inlong.sort.base.dirty.DirtyData;
+import org.apache.inlong.sort.base.dirty.sink.DirtySink;
+import org.apache.inlong.sort.base.dirty.utils.FormatUtils;
+import org.apache.inlong.sort.base.util.LabelUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.flink.table.data.RowData.createFieldGetter;
+
+/**
+ * S3 dirty sink that is used to sink dirty data to s3
+ *
+ * @param <T>
+ */
+public class S3DirtySink<T> implements DirtySink<T> {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final Logger LOG = LoggerFactory.getLogger(S3DirtySink.class);
+
+    private final Map<String, List<String>> batchMap = new HashMap<>();
+    private final S3Options s3Options;
+    private final AtomicLong readInNum = new AtomicLong(0);
+    private final AtomicLong writeOutNum = new AtomicLong(0);
+    private final AtomicLong errorNum = new AtomicLong(0);
+    private final DataType physicalRowDataType;
+    private final RowData.FieldGetter[] fieldGetters;
+    private RowDataToJsonConverter converter;
+    private long batchBytes = 0L;
+    private int size;
+    private transient volatile boolean closed = false;
+    private transient volatile boolean flushing = false;
+    private transient ScheduledExecutorService scheduler;
+    private transient ScheduledFuture<?> scheduledFuture;
+    private transient S3Helper s3Helper;
+
+    public S3DirtySink(S3Options s3Options, DataType physicalRowDataType) {
+        this.s3Options = s3Options;
+        this.physicalRowDataType = physicalRowDataType;
+        final LogicalType[] logicalTypes = physicalRowDataType.getChildren()
+                .stream().map(DataType::getLogicalType).toArray(LogicalType[]::new);
+        this.fieldGetters = new RowData.FieldGetter[logicalTypes.length];
+        for (int i = 0; i < logicalTypes.length; i++) {
+            fieldGetters[i] = createFieldGetter(logicalTypes[i], i);
+        }
+    }
+
+    @Override
+    public void open(Configuration configuration) throws Exception {
+        converter = new RowDataToJsonConverters(TimestampFormat.SQL, MapNullKeyMode.DROP, null)
+                .createConverter(physicalRowDataType.getLogicalType());
+        AmazonS3 s3Client;
+        if (s3Options.getAccessKeyId() != null && s3Options.getSecretKeyId() != null) {
+            BasicAWSCredentials awsCreds =
+                    new BasicAWSCredentials(s3Options.getAccessKeyId(), s3Options.getSecretKeyId());
+            s3Client = AmazonS3ClientBuilder.standard().withCredentials(new AWSStaticCredentialsProvider(awsCreds))
+                    .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(
+                            s3Options.getEndpoint(),
+                            s3Options.getRegion()))
+                    .build();
+        } else {
+            s3Client = AmazonS3ClientBuilder.standard().withEndpointConfiguration(
+                    new AwsClientBuilder.EndpointConfiguration(s3Options.getEndpoint(), s3Options.getRegion())).build();
+        }
+        s3Helper = new S3Helper(s3Client, s3Options);
+        this.scheduler = new ScheduledThreadPoolExecutor(1,
+                new ExecutorThreadFactory("s3-dirty-sink"));
+        this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(() -> {
+            if (!closed && !flushing) {
+                flush();
+            }
+        }, s3Options.getBatchIntervalMs(), s3Options.getBatchIntervalMs(), TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public synchronized void invoke(DirtyData<T> dirtyData) throws Exception {
+        addBatch(dirtyData);
+        if (valid() && !flushing) {
+            flush();
+        }
+    }
+
+    private boolean valid() {
+        return (s3Options.getBatchSize() > 0 && size >= s3Options.getBatchSize())
+                || batchBytes >= s3Options.getMaxBatchBytes();
+    }
+
+    private void addBatch(DirtyData<T> dirtyData) throws IOException {
+        readInNum.incrementAndGet();
+        String value;
+        Map<String, String> labelMap = LabelUtils.parseLabels(dirtyData.getLabels());
+        T data = dirtyData.getData();
+        if (data instanceof RowData) {
+            value = format((RowData) data, labelMap);
+        } else if (data instanceof JsonNode) {
+            value = format((JsonNode) data, labelMap);
+        } else {
+            // Only support csv format when the row is not a 'RowData' and 'JsonNode'
+            value = FormatUtils.csvFormat(data, labelMap, s3Options.getFieldDelimiter());
+        }
+        if (s3Options.enableDirtyLog()) {
+            LOG.info("[{}] {}", dirtyData.getLogTag(), value);
+        }
+        batchBytes += value.getBytes(UTF_8).length;
+        size++;
+        batchMap.computeIfAbsent(dirtyData.getIdentifier(), k -> new ArrayList<>()).add(value);
+    }
+
+    private String format(RowData data, Map<String, String> labels) throws JsonProcessingException {
+        String value;
+        switch (s3Options.getFormat()) {
+            case "csv":
+                value = FormatUtils.csvFormat(data, fieldGetters, labels, s3Options.getFieldDelimiter());
+                break;
+            case "json":
+                value = FormatUtils.jsonFormat(data, converter, labels);
+                break;
+            default:
+                throw new UnsupportedOperationException(
+                        String.format("Unsupported format for: %s", s3Options.getFormat()));
+        }
+        return value;
+    }
+
+    private String format(JsonNode data, Map<String, String> labels) throws JsonProcessingException {
+        String value;
+        switch (s3Options.getFormat()) {
+            case "csv":
+                value = FormatUtils.csvFormat(data, labels, s3Options.getFieldDelimiter());
+                break;
+            case "json":
+                value = FormatUtils.jsonFormat(data, labels);
+                break;
+            default:
+                throw new UnsupportedOperationException(
+                        String.format("Unsupported format for: %s", s3Options.getFormat()));
+        }
+        return value;
+    }
+
+    @Override
+    public synchronized void close() throws IOException {
+        if (!closed) {
+            closed = true;
+            if (this.scheduledFuture != null) {
+                scheduledFuture.cancel(false);
+                this.scheduler.shutdown();
+            }
+            try {
+                flush();
+            } catch (Exception e) {
+                LOG.warn("Writing records to s3 failed.", e);
+                throw new RuntimeException("Writing records to s3 failed.", e);
+            }
+        }
+    }
+
+    /**
+     * Flush data to s3
+     */
+    public synchronized void flush() {
+        flushing = true;
+        if (!hasRecords()) {
+            flushing = false;
+            return;
+        }
+        for (Entry<String, List<String>> kvs : batchMap.entrySet()) {
+            flushSingleIdentifier(kvs.getKey(), kvs.getValue());
+        }
+        batchMap.clear();
+        batchBytes = 0;
+        size = 0;
+        flushing = false;
+        LOG.info("S3 dirty sink statistics: readInNum: {}, writeOutNum: {}, errorNum: {}",
+                readInNum.get(), writeOutNum.get(), errorNum.get());
+    }
+
+    /**
+     * Flush data of single identifier to s3
+     *
+     * @param identifier The identifier of dirty data
+     * @param values The values of the identifier
+     */
+    private void flushSingleIdentifier(String identifier, List<String> values) {
+        if (values == null || values.isEmpty()) {
+            return;
+        }
+        String content = null;
+        try {
+            content = StringUtils.join(values, s3Options.getLineDelimiter());
+            s3Helper.upload(identifier, content);
+            LOG.info("Write {} records to s3 of identifier: {}", values.size(), identifier);
+            writeOutNum.addAndGet(values.size());
+            // Clean the data that has been loaded.
+            values.clear();
+        } catch (Exception e) {
+            errorNum.addAndGet(values.size());
+            if (!s3Options.ignoreSideOutputErrors()) {
+                throw new RuntimeException(
+                        String.format("Writing records to s3 of identifier:%s failed, the value: %s.",
+                                identifier, content),
+                        e);
+            }
+            LOG.warn("Writing records to s3 of identifier:{} failed and the dirty data will be throw away in the future"
+                    + " because the option 'sink.dirty.ignore-errors' is 'true'", identifier);
+        }
+    }
+
+    private boolean hasRecords() {
+        if (batchMap.isEmpty()) {
+            return false;
+        }
+        boolean hasRecords = false;

Review Comment:
   can simplify to
   `
           for (List<String> value : batchMap.values()) {
               if (!value.isEmpty()) {
                   return true
               }
           }
           return false;
   `



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] Yizhou-Yang commented on pull request #6655: [INLONG-6654][Sort] Supports s3 side-output for dirty data

Posted by GitBox <gi...@apache.org>.
Yizhou-Yang commented on PR #6655:
URL: https://github.com/apache/inlong/pull/6655#issuecomment-1329963101

   I think that logdirtysink and s3dirtysink should be refactored to somewhere other than sort-base because essentially they are two sink factories. two questions: 1) will inlong manager need to change to adapt to these changes? 2) when are these sinks created? in other sink's connectors? when manager parses the parameters? or when the task is created?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] gong commented on a diff in pull request #6655: [INLONG-6654][Sort] Supports s3 side-output for dirty data

Posted by GitBox <gi...@apache.org>.
gong commented on code in PR #6655:
URL: https://github.com/apache/inlong/pull/6655#discussion_r1034294776


##########
inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3DirtySink.java:
##########
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.dirty.sink.s3;
+
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.client.builder.AwsClientBuilder;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3ClientBuilder;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.formats.common.TimestampFormat;
+import org.apache.flink.formats.json.JsonOptions.MapNullKeyMode;
+import org.apache.flink.formats.json.RowDataToJsonConverters;
+import org.apache.flink.formats.json.RowDataToJsonConverters.RowDataToJsonConverter;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.inlong.sort.base.dirty.DirtyData;
+import org.apache.inlong.sort.base.dirty.sink.DirtySink;
+import org.apache.inlong.sort.base.dirty.utils.FormatUtils;
+import org.apache.inlong.sort.base.util.LabelUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.flink.table.data.RowData.createFieldGetter;
+
+/**
+ * S3 dirty sink that is used to sink dirty data to s3
+ *
+ * @param <T>
+ */
+public class S3DirtySink<T> implements DirtySink<T> {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final Logger LOG = LoggerFactory.getLogger(S3DirtySink.class);
+
+    private final Map<String, List<String>> batchMap = new HashMap<>();
+    private final S3Options s3Options;
+    private final AtomicLong readInNum = new AtomicLong(0);
+    private final AtomicLong writeOutNum = new AtomicLong(0);
+    private final AtomicLong errorNum = new AtomicLong(0);
+    private final DataType physicalRowDataType;
+    private final RowData.FieldGetter[] fieldGetters;
+    private RowDataToJsonConverter converter;
+    private long batchBytes = 0L;
+    private int size;
+    private transient volatile boolean closed = false;
+    private transient volatile boolean flushing = false;
+    private transient ScheduledExecutorService scheduler;
+    private transient ScheduledFuture<?> scheduledFuture;
+    private transient S3Helper s3Helper;
+
+    public S3DirtySink(S3Options s3Options, DataType physicalRowDataType) {
+        this.s3Options = s3Options;
+        this.physicalRowDataType = physicalRowDataType;
+        final LogicalType[] logicalTypes = physicalRowDataType.getChildren()
+                .stream().map(DataType::getLogicalType).toArray(LogicalType[]::new);
+        this.fieldGetters = new RowData.FieldGetter[logicalTypes.length];
+        for (int i = 0; i < logicalTypes.length; i++) {
+            fieldGetters[i] = createFieldGetter(logicalTypes[i], i);
+        }
+    }
+
+    @Override
+    public void open(Configuration configuration) throws Exception {
+        converter = new RowDataToJsonConverters(TimestampFormat.SQL, MapNullKeyMode.DROP, null)
+                .createConverter(physicalRowDataType.getLogicalType());
+        AmazonS3 s3Client;
+        if (s3Options.getAccessKeyId() != null && s3Options.getSecretKeyId() != null) {
+            BasicAWSCredentials awsCreds =
+                    new BasicAWSCredentials(s3Options.getAccessKeyId(), s3Options.getSecretKeyId());
+            s3Client = AmazonS3ClientBuilder.standard().withCredentials(new AWSStaticCredentialsProvider(awsCreds))
+                    .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(
+                            s3Options.getEndpoint(),
+                            s3Options.getRegion()))
+                    .build();
+        } else {
+            s3Client = AmazonS3ClientBuilder.standard().withEndpointConfiguration(
+                    new AwsClientBuilder.EndpointConfiguration(s3Options.getEndpoint(), s3Options.getRegion())).build();
+        }
+        s3Helper = new S3Helper(s3Client, s3Options);
+        this.scheduler = new ScheduledThreadPoolExecutor(1,
+                new ExecutorThreadFactory("s3-dirty-sink"));
+        this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(() -> {
+            if (!closed && !flushing) {
+                flush();
+            }
+        }, s3Options.getBatchIntervalMs(), s3Options.getBatchIntervalMs(), TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public synchronized void invoke(DirtyData<T> dirtyData) throws Exception {
+        addBatch(dirtyData);

Review Comment:
   try catch `addBatch` and add if (!s3Options.ignoreSideOutputErrors()) to throw exception



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] gong commented on a diff in pull request #6655: [INLONG-6654][Sort] Supports s3 side-output for dirty data

Posted by GitBox <gi...@apache.org>.
gong commented on code in PR #6655:
URL: https://github.com/apache/inlong/pull/6655#discussion_r1034276964


##########
inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3Helper.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.dirty.sink.s3;
+
+import com.amazonaws.services.s3.AmazonS3;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Random;
+
+/**
+ * S3 helper class, it helps write to s3
+ */
+public class S3Helper implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final Logger LOG = LoggerFactory.getLogger(S3DirtySink.class);
+
+    private static final DateTimeFormatter DATE_TIME_FORMAT = DateTimeFormatter.ofPattern("yyyyMMddHHmmss");
+
+    private static final int SEQUENCE_LENGTH = 4;
+    private static final String ESCAPE_PATTERN = "[\\pP\\p{Punct}\\s]";
+    private static final String FILE_NAME_SUFFIX = ".txt";
+    private final Random r = new Random();
+    private final AmazonS3 s3Client;
+    private final S3Options s3Options;
+
+    S3Helper(AmazonS3 s3Client, S3Options s3Options) {
+        this.s3Client = s3Client;
+        this.s3Options = s3Options;
+    }
+
+    /**
+     * Upload data to s3
+     *
+     * @param identifier The identifier of dirty data
+     * @param content The content that will be upload
+     * @throws IOException The exception may be thrown when executing
+     */
+    public void upload(String identifier, String content) throws IOException {
+        String path = genFileName(identifier);
+        for (int i = 0; i <= s3Options.getMaxRetries(); i++) {

Review Comment:
   `i <= s3Options.getMaxRetries()` change to `i < s3Options.getMaxRetries()`. because i start from 0



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] EMsnap commented on a diff in pull request #6655: [INLONG-6654][Sort] Supports s3 side-output for dirty data

Posted by GitBox <gi...@apache.org>.
EMsnap commented on code in PR #6655:
URL: https://github.com/apache/inlong/pull/6655#discussion_r1034346334


##########
inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3DirtySink.java:
##########
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.dirty.sink.s3;
+
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.client.builder.AwsClientBuilder;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3ClientBuilder;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.formats.common.TimestampFormat;
+import org.apache.flink.formats.json.JsonOptions.MapNullKeyMode;
+import org.apache.flink.formats.json.RowDataToJsonConverters;
+import org.apache.flink.formats.json.RowDataToJsonConverters.RowDataToJsonConverter;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.inlong.sort.base.dirty.DirtyData;
+import org.apache.inlong.sort.base.dirty.sink.DirtySink;
+import org.apache.inlong.sort.base.dirty.utils.FormatUtils;
+import org.apache.inlong.sort.base.util.LabelUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.flink.table.data.RowData.createFieldGetter;
+
+/**
+ * S3 dirty sink that is used to sink dirty data to s3
+ *
+ * @param <T>
+ */
+public class S3DirtySink<T> implements DirtySink<T> {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final Logger LOG = LoggerFactory.getLogger(S3DirtySink.class);

Review Comment:
   please use LOGGER corresponding to its class 



##########
inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3Helper.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.dirty.sink.s3;
+
+import com.amazonaws.services.s3.AmazonS3;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Random;
+
+/**
+ * S3 helper class, it helps write to s3
+ */
+public class S3Helper implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final Logger LOG = LoggerFactory.getLogger(S3DirtySink.class);

Review Comment:
   please use LOGGER corresponding to its class



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] yunqingmoswu commented on a diff in pull request #6655: [INLONG-6654][Sort] Supports s3 side-output for dirty data

Posted by GitBox <gi...@apache.org>.
yunqingmoswu commented on code in PR #6655:
URL: https://github.com/apache/inlong/pull/6655#discussion_r1034231078


##########
inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3DirtySink.java:
##########
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.dirty.sink.s3;
+
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.client.builder.AwsClientBuilder;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3ClientBuilder;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.formats.common.TimestampFormat;
+import org.apache.flink.formats.json.JsonOptions.MapNullKeyMode;
+import org.apache.flink.formats.json.RowDataToJsonConverters;
+import org.apache.flink.formats.json.RowDataToJsonConverters.RowDataToJsonConverter;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.inlong.sort.base.dirty.DirtyData;
+import org.apache.inlong.sort.base.dirty.sink.DirtySink;
+import org.apache.inlong.sort.base.dirty.utils.FormatUtils;
+import org.apache.inlong.sort.base.util.LabelUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.flink.table.data.RowData.createFieldGetter;
+
+/**
+ * S3 dirty sink that is used to sink dirty data to s3
+ *
+ * @param <T>
+ */
+public class S3DirtySink<T> implements DirtySink<T> {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final Logger LOG = LoggerFactory.getLogger(S3DirtySink.class);
+
+    private final Map<String, List<String>> batchMap = new HashMap<>();
+    private final S3Options s3Options;
+    private final AtomicLong readInNum = new AtomicLong(0);
+    private final AtomicLong writeOutNum = new AtomicLong(0);
+    private final AtomicLong errorNum = new AtomicLong(0);
+    private final DataType physicalRowDataType;
+    private final RowData.FieldGetter[] fieldGetters;
+    private RowDataToJsonConverter converter;
+    private long batchBytes = 0L;
+    private int size;
+    private transient volatile boolean closed = false;
+    private transient volatile boolean flushing = false;
+    private transient ScheduledExecutorService scheduler;
+    private transient ScheduledFuture<?> scheduledFuture;
+    private transient S3Helper s3Helper;
+
+    public S3DirtySink(S3Options s3Options, DataType physicalRowDataType) {
+        this.s3Options = s3Options;
+        this.physicalRowDataType = physicalRowDataType;
+        final LogicalType[] logicalTypes = physicalRowDataType.getChildren()
+                .stream().map(DataType::getLogicalType).toArray(LogicalType[]::new);
+        this.fieldGetters = new RowData.FieldGetter[logicalTypes.length];
+        for (int i = 0; i < logicalTypes.length; i++) {
+            fieldGetters[i] = createFieldGetter(logicalTypes[i], i);

Review Comment:
   It is already like this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] Yizhou-Yang commented on a diff in pull request #6655: [INLONG-6654][Sort] Supports s3 side-output for dirty data

Posted by GitBox <gi...@apache.org>.
Yizhou-Yang commented on code in PR #6655:
URL: https://github.com/apache/inlong/pull/6655#discussion_r1034212948


##########
inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3DirtySink.java:
##########
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.dirty.sink.s3;
+
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.client.builder.AwsClientBuilder;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3ClientBuilder;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.formats.common.TimestampFormat;
+import org.apache.flink.formats.json.JsonOptions.MapNullKeyMode;
+import org.apache.flink.formats.json.RowDataToJsonConverters;
+import org.apache.flink.formats.json.RowDataToJsonConverters.RowDataToJsonConverter;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.inlong.sort.base.dirty.DirtyData;
+import org.apache.inlong.sort.base.dirty.sink.DirtySink;
+import org.apache.inlong.sort.base.dirty.utils.FormatUtils;
+import org.apache.inlong.sort.base.util.LabelUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.flink.table.data.RowData.createFieldGetter;
+
+/**
+ * S3 dirty sink that is used to sink dirty data to s3
+ *
+ * @param <T>
+ */
+public class S3DirtySink<T> implements DirtySink<T> {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final Logger LOG = LoggerFactory.getLogger(S3DirtySink.class);
+
+    private final Map<String, List<String>> batchMap = new HashMap<>();
+    private final S3Options s3Options;
+    private final AtomicLong readInNum = new AtomicLong(0);
+    private final AtomicLong writeOutNum = new AtomicLong(0);
+    private final AtomicLong errorNum = new AtomicLong(0);
+    private final DataType physicalRowDataType;
+    private final RowData.FieldGetter[] fieldGetters;
+    private RowDataToJsonConverter converter;
+    private long batchBytes = 0L;
+    private int size;
+    private transient volatile boolean closed = false;
+    private transient volatile boolean flushing = false;
+    private transient ScheduledExecutorService scheduler;
+    private transient ScheduledFuture<?> scheduledFuture;
+    private transient S3Helper s3Helper;
+
+    public S3DirtySink(S3Options s3Options, DataType physicalRowDataType) {
+        this.s3Options = s3Options;
+        this.physicalRowDataType = physicalRowDataType;
+        final LogicalType[] logicalTypes = physicalRowDataType.getChildren()
+                .stream().map(DataType::getLogicalType).toArray(LogicalType[]::new);
+        this.fieldGetters = new RowData.FieldGetter[logicalTypes.length];
+        for (int i = 0; i < logicalTypes.length; i++) {
+            fieldGetters[i] = createFieldGetter(logicalTypes[i], i);

Review Comment:
   suggested "RowData.createFieldGetter" instead of static method import



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] Yizhou-Yang commented on a diff in pull request #6655: [INLONG-6654][Sort] Supports s3 side-output for dirty data

Posted by GitBox <gi...@apache.org>.
Yizhou-Yang commented on code in PR #6655:
URL: https://github.com/apache/inlong/pull/6655#discussion_r1034212293


##########
inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3DirtySink.java:
##########
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.dirty.sink.s3;
+
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.client.builder.AwsClientBuilder;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3ClientBuilder;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.formats.common.TimestampFormat;
+import org.apache.flink.formats.json.JsonOptions.MapNullKeyMode;
+import org.apache.flink.formats.json.RowDataToJsonConverters;
+import org.apache.flink.formats.json.RowDataToJsonConverters.RowDataToJsonConverter;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.inlong.sort.base.dirty.DirtyData;
+import org.apache.inlong.sort.base.dirty.sink.DirtySink;
+import org.apache.inlong.sort.base.dirty.utils.FormatUtils;
+import org.apache.inlong.sort.base.util.LabelUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.flink.table.data.RowData.createFieldGetter;
+
+/**
+ * S3 dirty sink that is used to sink dirty data to s3
+ *
+ * @param <T>
+ */
+public class S3DirtySink<T> implements DirtySink<T> {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final Logger LOG = LoggerFactory.getLogger(S3DirtySink.class);
+
+    private final Map<String, List<String>> batchMap = new HashMap<>();
+    private final S3Options s3Options;
+    private final AtomicLong readInNum = new AtomicLong(0);
+    private final AtomicLong writeOutNum = new AtomicLong(0);
+    private final AtomicLong errorNum = new AtomicLong(0);
+    private final DataType physicalRowDataType;
+    private final RowData.FieldGetter[] fieldGetters;
+    private RowDataToJsonConverter converter;
+    private long batchBytes = 0L;
+    private int size;
+    private transient volatile boolean closed = false;
+    private transient volatile boolean flushing = false;
+    private transient ScheduledExecutorService scheduler;
+    private transient ScheduledFuture<?> scheduledFuture;
+    private transient S3Helper s3Helper;
+
+    public S3DirtySink(S3Options s3Options, DataType physicalRowDataType) {
+        this.s3Options = s3Options;
+        this.physicalRowDataType = physicalRowDataType;
+        final LogicalType[] logicalTypes = physicalRowDataType.getChildren()
+                .stream().map(DataType::getLogicalType).toArray(LogicalType[]::new);
+        this.fieldGetters = new RowData.FieldGetter[logicalTypes.length];
+        for (int i = 0; i < logicalTypes.length; i++) {
+            fieldGetters[i] = createFieldGetter(logicalTypes[i], i);
+        }
+    }
+
+    @Override
+    public void open(Configuration configuration) throws Exception {
+        converter = new RowDataToJsonConverters(TimestampFormat.SQL, MapNullKeyMode.DROP, null)
+                .createConverter(physicalRowDataType.getLogicalType());
+        AmazonS3 s3Client;
+        if (s3Options.getAccessKeyId() != null && s3Options.getSecretKeyId() != null) {
+            BasicAWSCredentials awsCreds =
+                    new BasicAWSCredentials(s3Options.getAccessKeyId(), s3Options.getSecretKeyId());

Review Comment:
   should user pass accesskey and secret key in option directly? will there be security problems?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] yunqingmoswu commented on a diff in pull request #6655: [INLONG-6654][Sort] Supports s3 side-output for dirty data

Posted by GitBox <gi...@apache.org>.
yunqingmoswu commented on code in PR #6655:
URL: https://github.com/apache/inlong/pull/6655#discussion_r1034230678


##########
inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3DirtySink.java:
##########
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.dirty.sink.s3;
+
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.client.builder.AwsClientBuilder;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3ClientBuilder;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.formats.common.TimestampFormat;
+import org.apache.flink.formats.json.JsonOptions.MapNullKeyMode;
+import org.apache.flink.formats.json.RowDataToJsonConverters;
+import org.apache.flink.formats.json.RowDataToJsonConverters.RowDataToJsonConverter;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.inlong.sort.base.dirty.DirtyData;
+import org.apache.inlong.sort.base.dirty.sink.DirtySink;
+import org.apache.inlong.sort.base.dirty.utils.FormatUtils;
+import org.apache.inlong.sort.base.util.LabelUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.flink.table.data.RowData.createFieldGetter;
+
+/**
+ * S3 dirty sink that is used to sink dirty data to s3
+ *
+ * @param <T>
+ */
+public class S3DirtySink<T> implements DirtySink<T> {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final Logger LOG = LoggerFactory.getLogger(S3DirtySink.class);
+
+    private final Map<String, List<String>> batchMap = new HashMap<>();
+    private final S3Options s3Options;
+    private final AtomicLong readInNum = new AtomicLong(0);
+    private final AtomicLong writeOutNum = new AtomicLong(0);
+    private final AtomicLong errorNum = new AtomicLong(0);
+    private final DataType physicalRowDataType;
+    private final RowData.FieldGetter[] fieldGetters;
+    private RowDataToJsonConverter converter;
+    private long batchBytes = 0L;
+    private int size;
+    private transient volatile boolean closed = false;
+    private transient volatile boolean flushing = false;
+    private transient ScheduledExecutorService scheduler;
+    private transient ScheduledFuture<?> scheduledFuture;
+    private transient S3Helper s3Helper;
+
+    public S3DirtySink(S3Options s3Options, DataType physicalRowDataType) {
+        this.s3Options = s3Options;
+        this.physicalRowDataType = physicalRowDataType;
+        final LogicalType[] logicalTypes = physicalRowDataType.getChildren()
+                .stream().map(DataType::getLogicalType).toArray(LogicalType[]::new);
+        this.fieldGetters = new RowData.FieldGetter[logicalTypes.length];
+        for (int i = 0; i < logicalTypes.length; i++) {
+            fieldGetters[i] = createFieldGetter(logicalTypes[i], i);
+        }
+    }
+
+    @Override
+    public void open(Configuration configuration) throws Exception {
+        converter = new RowDataToJsonConverters(TimestampFormat.SQL, MapNullKeyMode.DROP, null)
+                .createConverter(physicalRowDataType.getLogicalType());
+        AmazonS3 s3Client;
+        if (s3Options.getAccessKeyId() != null && s3Options.getSecretKeyId() != null) {
+            BasicAWSCredentials awsCreds =
+                    new BasicAWSCredentials(s3Options.getAccessKeyId(), s3Options.getSecretKeyId());

Review Comment:
   It can also access s3 by the confing from environment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] EMsnap commented on a diff in pull request #6655: [INLONG-6654][Sort] Supports s3 side-output for dirty data

Posted by GitBox <gi...@apache.org>.
EMsnap commented on code in PR #6655:
URL: https://github.com/apache/inlong/pull/6655#discussion_r1034337474


##########
licenses/inlong-sort-connectors/LICENSE:
##########
@@ -844,7 +844,7 @@ The text of each license is the standard Apache 2.0 license.
   com.tencentcloudapi:tencentcloud-sdk-java:3.1.545 - tencentcloud-sdk-java (https://github.com/TencentCloud/tencentcloud-sdk-java), (The Apache Software License, Version 2.0)
   com.qcloud:dlc-data-catalog-metastore-client:1.1.1 - dlc-data-catalog-metastore-client (https://mvnrepository.com/artifact/com.qcloud/dlc-data-catalog-metastore-client/1.1), (The Apache Software License, Version 2.0)
   org.apache.doris:flink-doris-connector-1.13_2.11:1.0.3 - Flink Connector for Apache Doris (https://github.com/apache/doris-flink-connector/tree/1.13_2.11-1.0.3), (The Apache Software License, Version 2.0)
-
+  com.amazonaws:aws-java-sdk-s3:jar:1.12.7 - AWS Java SDK for Amazon S3 (https://aws.amazon.com/sdkforjava), (The Apache Software License, Version 2.0)

Review Comment:
   miss match version to 1.12.346



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] EMsnap commented on a diff in pull request #6655: [INLONG-6654][Sort] Supports s3 side-output for dirty data

Posted by GitBox <gi...@apache.org>.
EMsnap commented on code in PR #6655:
URL: https://github.com/apache/inlong/pull/6655#discussion_r1034346334


##########
inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3DirtySink.java:
##########
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.dirty.sink.s3;
+
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.client.builder.AwsClientBuilder;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3ClientBuilder;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.formats.common.TimestampFormat;
+import org.apache.flink.formats.json.JsonOptions.MapNullKeyMode;
+import org.apache.flink.formats.json.RowDataToJsonConverters;
+import org.apache.flink.formats.json.RowDataToJsonConverters.RowDataToJsonConverter;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.inlong.sort.base.dirty.DirtyData;
+import org.apache.inlong.sort.base.dirty.sink.DirtySink;
+import org.apache.inlong.sort.base.dirty.utils.FormatUtils;
+import org.apache.inlong.sort.base.util.LabelUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.flink.table.data.RowData.createFieldGetter;
+
+/**
+ * S3 dirty sink that is used to sink dirty data to s3
+ *
+ * @param <T>
+ */
+public class S3DirtySink<T> implements DirtySink<T> {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final Logger LOG = LoggerFactory.getLogger(S3DirtySink.class);

Review Comment:
   please use LOGGER corresponding to its class 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] EMsnap commented on a diff in pull request #6655: [INLONG-6654][Sort] Supports s3 side-output for dirty data

Posted by GitBox <gi...@apache.org>.
EMsnap commented on code in PR #6655:
URL: https://github.com/apache/inlong/pull/6655#discussion_r1034347594


##########
inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3DirtySinkFactory.java:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.dirty.sink.s3;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.inlong.sort.base.dirty.sink.DirtySink;
+import org.apache.inlong.sort.base.dirty.sink.DirtySinkFactory;
+
+import java.util.HashSet;
+import java.util.Set;
+import static org.apache.inlong.sort.base.Constants.DIRTY_IDENTIFIER;
+import static org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_BATCH_BYTES;
+import static org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_BATCH_INTERVAL;
+import static org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_BATCH_SIZE;
+import static org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_FIELD_DELIMITER;
+import static org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_FORMAT;
+import static org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_IGNORE_ERRORS;
+import static org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_LINE_DELIMITER;
+import static org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_LOG_ENABLE;
+import static org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_RETRIES;
+
+/**
+ * S3 dirty sink factory
+ */
+public class S3DirtySinkFactory implements DirtySinkFactory {
+
+    private static final String IDENTIFIER = "s3";

Review Comment:
   maybe we should make the name be consistent with other connectors which could be s3-inlong, so as is log-inlong



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org