You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by "dockerzhang (via GitHub)" <gi...@apache.org> on 2023/03/06 10:44:21 UTC

[GitHub] [inlong] dockerzhang commented on a diff in pull request #7524: [INLONG-7519][Audit] Proxy support Kafka

dockerzhang commented on code in PR #7524:
URL: https://github.com/apache/inlong/pull/7524#discussion_r1126228939


##########
inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/sink/KafkaSink.java:
##########
@@ -0,0 +1,430 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.sink;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.RateLimiter;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.Transaction;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.instrumentation.SinkCounter;
+import org.apache.flume.sink.AbstractSink;
+import org.apache.inlong.audit.base.HighPriorityThreadFactory;
+import org.apache.inlong.audit.utils.FailoverChannelProcessorHolder;
+import org.apache.inlong.common.util.NetworkUtils;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class KafkaSink extends AbstractSink implements Configurable {
+
+    private static final Logger logger = LoggerFactory.getLogger(KafkaSink.class);
+
+    // for kafka producer
+    private static Properties properties = new Properties();
+    private static final String BOOTSTRAP_SERVER = "bootstrap_servers";
+    private static final String TOPIC = "topic";
+    private static final String RETRIES = "retries";
+    private static final String BATCH_SIZE = "batch_size";
+    private static final String LINGER_MS = "linger_ms";
+    private static final String BUFFER_MEMORY = "buffer_memory";
+    private static final String defaultRetries = "0";
+    private static final String defaultBatchSize = "16384";
+    private static final String defaultLingerMs = "0";
+    private static final String defaultBufferMemory = "33554432";
+    private static final String defaultAcks = "all";
+
+    private static final Long PRINT_INTERVAL = 30L;
+    private static final KafkaPerformanceTask kafkaPerformanceTask = new KafkaPerformanceTask();
+    private static ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1,
+            new HighPriorityThreadFactory("kafkaPerformance-Printer-thread"));
+
+    private KafkaProducer<String, byte[]> producer;
+    public Map<String, KafkaProducer<String, byte[]>> producerMap;
+    private SinkCounter sinkCounter;
+    private String topic;
+    private volatile boolean canSend = false;
+    private volatile boolean canTake = false;
+    private int threadNum;
+    private Thread[] sinkThreadPool;
+
+    private static final int BAD_EVENT_QUEUE_SIZE = 10000;
+    private static final int EVENT_QUEUE_SIZE = 1000;
+    private static final int DEFAULT_LOG_EVERY_N_EVENTS = 100000;
+    private LinkedBlockingQueue<EventStat> resendQueue;
+    private LinkedBlockingQueue<Event> eventQueue;
+
+    // for log
+    private Integer logEveryNEvents;
+    private long diskIORatePerSec;
+    private RateLimiter diskRateLimiter;
+
+    // properties for stat
+    private static final String LOG_EVERY_N_EVENTS = "log_every_n_events";
+    private static final String DISK_IO_RATE_PER_SEC = "disk_io_rate_per_sec";
+    private static final String SINK_THREAD_NUM = "thread-num";
+
+    // for stas
+    private AtomicLong currentSuccessSendCnt = new AtomicLong(0);
+    private AtomicLong lastSuccessSendCnt = new AtomicLong(0);
+    private long t1 = System.currentTimeMillis();
+    private long t2 = 0L;
+    private static AtomicLong totalKafkaSuccSendCnt = new AtomicLong(0);
+    private static AtomicLong totalKafkaSuccSendSize = new AtomicLong(0);
+
+    private boolean overflow = false;
+
+    private String localIp = "127.0.0.1";
+
+    static {
+        // stat kafka performance
+        System.out.println("kafkaPerformanceTask!!!!!!");

Review Comment:
   remove the `sout`



##########
inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/sink/KafkaSink.java:
##########
@@ -0,0 +1,430 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.sink;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.RateLimiter;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.Transaction;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.instrumentation.SinkCounter;
+import org.apache.flume.sink.AbstractSink;
+import org.apache.inlong.audit.base.HighPriorityThreadFactory;
+import org.apache.inlong.audit.utils.FailoverChannelProcessorHolder;
+import org.apache.inlong.common.util.NetworkUtils;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class KafkaSink extends AbstractSink implements Configurable {
+
+    private static final Logger logger = LoggerFactory.getLogger(KafkaSink.class);
+
+    // for kafka producer
+    private static Properties properties = new Properties();
+    private static final String BOOTSTRAP_SERVER = "bootstrap_servers";
+    private static final String TOPIC = "topic";
+    private static final String RETRIES = "retries";
+    private static final String BATCH_SIZE = "batch_size";
+    private static final String LINGER_MS = "linger_ms";
+    private static final String BUFFER_MEMORY = "buffer_memory";
+    private static final String defaultRetries = "0";
+    private static final String defaultBatchSize = "16384";
+    private static final String defaultLingerMs = "0";
+    private static final String defaultBufferMemory = "33554432";
+    private static final String defaultAcks = "all";
+
+    private static final Long PRINT_INTERVAL = 30L;
+    private static final KafkaPerformanceTask kafkaPerformanceTask = new KafkaPerformanceTask();
+    private static ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1,
+            new HighPriorityThreadFactory("kafkaPerformance-Printer-thread"));
+
+    private KafkaProducer<String, byte[]> producer;
+    public Map<String, KafkaProducer<String, byte[]>> producerMap;
+    private SinkCounter sinkCounter;
+    private String topic;
+    private volatile boolean canSend = false;
+    private volatile boolean canTake = false;
+    private int threadNum;
+    private Thread[] sinkThreadPool;
+
+    private static final int BAD_EVENT_QUEUE_SIZE = 10000;
+    private static final int EVENT_QUEUE_SIZE = 1000;
+    private static final int DEFAULT_LOG_EVERY_N_EVENTS = 100000;
+    private LinkedBlockingQueue<EventStat> resendQueue;
+    private LinkedBlockingQueue<Event> eventQueue;
+
+    // for log
+    private Integer logEveryNEvents;
+    private long diskIORatePerSec;
+    private RateLimiter diskRateLimiter;
+
+    // properties for stat
+    private static final String LOG_EVERY_N_EVENTS = "log_every_n_events";
+    private static final String DISK_IO_RATE_PER_SEC = "disk_io_rate_per_sec";
+    private static final String SINK_THREAD_NUM = "thread-num";
+
+    // for stas
+    private AtomicLong currentSuccessSendCnt = new AtomicLong(0);
+    private AtomicLong lastSuccessSendCnt = new AtomicLong(0);
+    private long t1 = System.currentTimeMillis();
+    private long t2 = 0L;
+    private static AtomicLong totalKafkaSuccSendCnt = new AtomicLong(0);
+    private static AtomicLong totalKafkaSuccSendSize = new AtomicLong(0);
+
+    private boolean overflow = false;
+
+    private String localIp = "127.0.0.1";
+
+    static {
+        // stat kafka performance
+        System.out.println("kafkaPerformanceTask!!!!!!");
+        scheduledExecutorService.scheduleWithFixedDelay(kafkaPerformanceTask, 0L,
+                PRINT_INTERVAL, TimeUnit.SECONDS);
+    }
+
+    public KafkaSink() {
+        super();
+        logger.debug("new instance of KafkaSink!");
+    }
+
+    @Override
+    public synchronized void start() {
+        logger.info("kafka sink starting...");
+        // create connection
+
+        sinkCounter.start();
+        super.start();
+        this.canSend = true;
+        this.canTake = true;
+
+        // init topic producer
+        initTopicProducer(topic);
+
+        for (int i = 0; i < sinkThreadPool.length; i++) {
+            sinkThreadPool[i] = new Thread(new SinkTask(), getName() + "_tube_sink_sender-" + i);
+            sinkThreadPool[i].start();
+        }
+        logger.debug("kafka sink started");
+    }
+
+    @Override
+    public synchronized void stop() {
+        logger.info("kafka sink stopping");
+        // stop connection
+        this.canTake = false;
+        int waitCount = 0;
+        while (eventQueue.size() != 0 && waitCount++ < 10) {
+            try {
+                Thread.sleep(1000);
+            } catch (InterruptedException e) {
+                logger.info("Stop thread has been interrupt!");
+                break;
+            }
+        }
+        this.canSend = false;
+
+        if (sinkThreadPool != null) {
+            for (Thread thread : sinkThreadPool) {
+                if (thread != null) {
+                    thread.interrupt();
+                }
+            }
+            sinkThreadPool = null;
+        }
+        super.stop();
+        if (!scheduledExecutorService.isShutdown()) {
+            scheduledExecutorService.shutdown();
+        }
+        sinkCounter.stop();
+        logger.debug("kafka sink stopped. Metrics:{}", sinkCounter);
+    }
+
+    @Override
+    public Status process() {
+        logger.debug("process......");

Review Comment:
   make the log more meaningful.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org