You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2021/12/07 01:46:12 UTC
[rocketmq-streams] branch main updated: remove useless test
This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-streams.git
The following commit(s) were added to refs/heads/main by this push:
new 74517f0 remove useless test
74517f0 is described below
commit 74517f05257e72f965862a170388e63f7e290aa2
Author: duhenglucky <du...@apache.org>
AuthorDate: Tue Dec 7 09:45:42 2021 +0800
remove useless test
---
pom.xml | 6 -
rocketmq-streams-channel-kafka/pom.xml | 48 ----
.../apache/rocketmq/streams/kafka/KafkaSplit.java | 55 -----
.../streams/kafka/KakfaChannelBuilder.java | 89 -------
.../rocketmq/streams/kafka/sink/KafkaSink.java | 226 -----------------
.../rocketmq/streams/kafka/source/KafkaSource.java | 275 ---------------------
.../rocketmq/streams/kafka/KafkaChannelTest.java | 104 --------
.../src/test/resources/log4j.xml | 36 ---
8 files changed, 839 deletions(-)
diff --git a/pom.xml b/pom.xml
index 93fbda0..55934bc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -57,7 +57,6 @@
<module>rocketmq-streams-channel-syslog</module>
<module>rocketmq-streams-channel-es</module>
<module>rocketmq-streams-runner</module>
- <module>rocketmq-streams-channel-kafka</module>
</modules>
<properties>
@@ -312,11 +311,6 @@
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
- <artifactId>rocketmq-streams-channel-kafka</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-streams-connectors</artifactId>
<version>${project.version}</version>
</dependency>
diff --git a/rocketmq-streams-channel-kafka/pom.xml b/rocketmq-streams-channel-kafka/pom.xml
deleted file mode 100644
index 309170d..0000000
--- a/rocketmq-streams-channel-kafka/pom.xml
+++ /dev/null
@@ -1,48 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- -->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <parent>
- <artifactId>rocketmq-streams</artifactId>
- <groupId>org.apache.rocketmq</groupId>
- <version>1.0.0-SNAPSHOT</version>
- </parent>
- <modelVersion>4.0.0</modelVersion>
-
- <artifactId>rocketmq-streams-channel-kafka</artifactId>
- <name>ROCKETMQ STREAMS :: channel-kafka</name>
-
- <properties>
- <maven.compiler.source>8</maven.compiler.source>
- <maven.compiler.target>8</maven.compiler.target>
- </properties>
-
-
- <dependencies>
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.12</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.apache.rocketmq</groupId>
- <artifactId>rocketmq-streams-commons</artifactId>
- </dependency>
- </dependencies>
-</project>
\ No newline at end of file
diff --git a/rocketmq-streams-channel-kafka/src/main/java/org/apache/rocketmq/streams/kafka/KafkaSplit.java b/rocketmq-streams-channel-kafka/src/main/java/org/apache/rocketmq/streams/kafka/KafkaSplit.java
deleted file mode 100644
index 20fa396..0000000
--- a/rocketmq-streams-channel-kafka/src/main/java/org/apache/rocketmq/streams/kafka/KafkaSplit.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.streams.kafka;
-
-import com.alibaba.fastjson.JSONObject;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.rocketmq.streams.common.channel.split.ISplit;
-import org.apache.rocketmq.streams.common.configurable.BasedConfigurable;
-
-public class KafkaSplit extends BasedConfigurable implements ISplit<KafkaSplit, PartitionInfo> {
- protected PartitionInfo partitionInfo;
- protected String topic;
- protected int partition;
-
- @Override
- public String getQueueId() {
- return partition + "";
- }
-
-
- @Override
- public PartitionInfo getQueue() {
- return partitionInfo;
- }
-
- @Override
- public int compareTo(KafkaSplit o) {
- return partition - o.partition;
- }
-
- @Override
- protected void getJsonObject(JSONObject jsonObject) {
- super.getJsonObject(jsonObject);
- this.partitionInfo = new PartitionInfo(topic, partition, null, null, null);
- }
-
- public KafkaSplit(PartitionInfo partitionInfo) {
- this.partitionInfo = partitionInfo;
- this.partition = partitionInfo.partition();
- }
-}
diff --git a/rocketmq-streams-channel-kafka/src/main/java/org/apache/rocketmq/streams/kafka/KakfaChannelBuilder.java b/rocketmq-streams-channel-kafka/src/main/java/org/apache/rocketmq/streams/kafka/KakfaChannelBuilder.java
deleted file mode 100644
index 2d947c5..0000000
--- a/rocketmq-streams-channel-kafka/src/main/java/org/apache/rocketmq/streams/kafka/KakfaChannelBuilder.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.streams.kafka;
-
-import com.alibaba.fastjson.JSONObject;
-import com.google.auto.service.AutoService;
-import java.util.Properties;
-import org.apache.rocketmq.streams.common.channel.builder.AbstractSupportShuffleChannelBuilder;
-import org.apache.rocketmq.streams.common.channel.builder.IChannelBuilder;
-import org.apache.rocketmq.streams.common.channel.sink.ISink;
-import org.apache.rocketmq.streams.common.channel.source.ISource;
-import org.apache.rocketmq.streams.common.metadata.MetaData;
-import org.apache.rocketmq.streams.common.model.ServiceName;
-import org.apache.rocketmq.streams.common.utils.ConfigurableUtil;
-import org.apache.rocketmq.streams.kafka.sink.KafkaSink;
-import org.apache.rocketmq.streams.kafka.source.KafkaSource;
-
-@AutoService(IChannelBuilder.class)
-@ServiceName(value = KakfaChannelBuilder.TYPE, aliasName = "KafkaSource")
-public class KakfaChannelBuilder extends AbstractSupportShuffleChannelBuilder {
- public static final String TYPE = "kakfa";
- /**
- * @param namespace
- * @param name
- * @param properties
- * @return
- */
- @Override
- public ISource createSource(String namespace, String name, Properties properties, MetaData metaData) {
-
- KafkaSource kafkaSource = (KafkaSource) ConfigurableUtil.create(KafkaSource.class.getName(), namespace, name, createFormatProperty(properties), null);
- return kafkaSource;
- }
-
- @Override
- public String getType() {
- return TYPE;
- }
-
- @Override
- public ISink createSink(String namespace, String name, Properties properties, MetaData metaData) {
- KafkaSink kafkaSink = (KafkaSink)ConfigurableUtil.create(KafkaSink.class.getName(), namespace, name, createFormatProperty(properties), null);
- return kafkaSink;
- }
-
- /**
- * 创建标准的属性文件
- *
- * @param properties
- * @return
- */
- protected JSONObject createFormatProperty(Properties properties) {
- JSONObject formatProperties = new JSONObject();
- for (Object object : properties.keySet()) {
- String key = (String)object;
- if ("type".equals(key)) {
- continue;
- }
- formatProperties.put(key, properties.getProperty(key));
- }
- IChannelBuilder.formatPropertiesName(formatProperties, properties, "groupName", "group.id");
- IChannelBuilder.formatPropertiesName(formatProperties, properties, "groupName", "consumerGroup");
- IChannelBuilder.formatPropertiesName(formatProperties, properties, "endpoint", "bootstrap.servers");
- IChannelBuilder.formatPropertiesName(formatProperties, properties, "maxThread", "thread.max.count");
- return formatProperties;
- }
-
- @Override
- public ISink createBySource(ISource pipelineSource) {
- KafkaSource kafkaSource = (KafkaSource)pipelineSource;
- String topic = kafkaSource.getTopic();
- KafkaSink sink = new KafkaSink(kafkaSource.getEndpoint(),topic);
- return sink;
- }
-}
diff --git a/rocketmq-streams-channel-kafka/src/main/java/org/apache/rocketmq/streams/kafka/sink/KafkaSink.java b/rocketmq-streams-channel-kafka/src/main/java/org/apache/rocketmq/streams/kafka/sink/KafkaSink.java
deleted file mode 100644
index 3ae0967..0000000
--- a/rocketmq-streams-channel-kafka/src/main/java/org/apache/rocketmq/streams/kafka/sink/KafkaSink.java
+++ /dev/null
@@ -1,226 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.streams.kafka.sink;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.kafka.clients.CommonClientConfigs;
-import org.apache.kafka.clients.admin.AdminClient;
-import org.apache.kafka.clients.admin.ListTopicsResult;
-import org.apache.kafka.clients.admin.NewTopic;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.producer.Callback;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.rocketmq.streams.common.channel.sink.AbstractSupportShuffleSink;
-import org.apache.rocketmq.streams.common.channel.split.ISplit;
-import org.apache.rocketmq.streams.common.configurable.annotation.ENVDependence;
-import org.apache.rocketmq.streams.common.context.IMessage;
-import org.apache.rocketmq.streams.kafka.KafkaSplit;
-import org.apache.rocketmq.streams.kafka.source.KafkaSource;
-
-import java.util.*;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ExecutorService;
-
-public class KafkaSink extends AbstractSupportShuffleSink {
- private static final Log LOG = LogFactory.getLog(KafkaSource.class);
-
- private static final String PREFIX = "dipper.upgrade.channel.kafak.envkey";
-
- private transient List<KafkaConsumer<String, String>> consumers = new ArrayList<>();
- private transient KafkaProducer kafkaProducer;
- private transient Properties props;
-
- private static int maxPollRecords = 100;
- private volatile transient boolean stop = false;
- protected String topic;
- @ENVDependence
- private String endpoint;
- private int sessionTimeout = 30000;
- private transient ExecutorService executorService = null;
- private transient ConcurrentLinkedQueue<ConsumerRecord<String, String>> itemQueue = new ConcurrentLinkedQueue<ConsumerRecord<String, String>>();
-
- public KafkaSink() {}
-
- public KafkaSink(String endpoint, String topic) {
- this.endpoint = endpoint;
- this.topic = topic;
- }
-
- @Override
- protected boolean initConfigurable() {
- Properties props = new Properties();
- props.put("bootstrap.servers", endpoint);
- props.put("enable.auto.commit", false);
- props.put("acks", "1"); //针对kafka producer可以做一些参数优化
- props.put("linger.ms", 1);
- props.put("batch.size", 16384);
- props.put("session.timeout.ms", String.valueOf(sessionTimeout));
- props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- this.props = props;
-
- kafkaProducer = new KafkaProducer<String, String>(props);
-
- return true;
- }
-
- @Override
- protected boolean batchInsert(List<IMessage> messages) {
- if (messages == null) {
- return true;
- }
- for (IMessage message : messages) {
- putMessage2Mq(message);
- }
- return true;
- }
-
- protected void destroyProducer() {
- if (kafkaProducer != null) {
- try {
- kafkaProducer.close();
- } catch (Throwable t) {
- if (LOG.isWarnEnabled()) {
- LOG.warn(t.getMessage(), t);
- }
- }
- }
- }
-
- @Override
- public void destroy() {
- super.destroy();
- stop = true;
- destroyProducer();
- }
-
- @Override
- protected void createTopicIfNotExist(int splitNum) {
- AdminClient adminClient = null;
- try {
-
- Properties properties = new Properties();
- properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
- endpoint);
-
- adminClient = AdminClient.create(properties);
- ListTopicsResult result = adminClient.listTopics();
- Set<String> topics = result.names().get();
- if (topics.contains(topic)) {
- return;
- }
- NewTopic newTopic = new NewTopic(topic, splitNum, (short)1);
- adminClient.createTopics(Arrays.asList(newTopic));
-
- LOG.info("创建主题成功:" + topic);
- } catch (Exception ex) {
- ex.printStackTrace();
- } finally {
- if (adminClient != null) {
- adminClient.close();
- }
- }
- }
-
- @Override
- public String getShuffleTopicFieldName() {
- return "topic";
- }
-
- @Override
- public List<ISplit> getSplitList() {
- List<PartitionInfo> partitionInfos = kafkaProducer.partitionsFor(topic);
- List<ISplit> splits = new ArrayList<>();
- for (PartitionInfo partitionInfo : partitionInfos) {
- splits.add(new KafkaSplit(partitionInfo));
- }
- return splits;
- }
-
- protected boolean putMessage2Mq(IMessage fieldName2Value) {
- try {
-
- LOG.info(String.format("topic=%s, record=%s", topic, fieldName2Value.getMessageValue().toString()));
- ProducerRecord<String, String> records =
- new ProducerRecord<String, String>(topic, fieldName2Value.getMessageValue().toString());
- kafkaProducer.send(records, new Callback() {
-
- @Override
- public void onCompletion(RecordMetadata recordMetadata, Exception e) {
- if (e != null) {
- // LOG.error("send kafka message error!topic=" + topic, e);
- } else {
- // LOG.info(String.format("send success topic=%s, record=%s", topic, jsonObject.toJSONString()));
- }
- }
- });
- } catch (Exception e) {
- LOG.error("send message error:" + fieldName2Value.getMessageValue().toString(), e);
- return false;
- }
- return true;
- }
-
- @Override
- public int getSplitNum() {
- return getSplitList().size();
- }
-
- public static int getMaxPollRecords() {
- return maxPollRecords;
- }
-
- public boolean isStop() {
- return stop;
- }
-
- public void setStop(boolean stop) {
- this.stop = stop;
- }
-
- public String getEndpoint() {
- return endpoint;
- }
-
- public void setEndpoint(String endpoint) {
- this.endpoint = endpoint;
- }
-
- public int getSessionTimeout() {
- return sessionTimeout;
- }
-
- public void setSessionTimeout(int sessionTimeout) {
- this.sessionTimeout = sessionTimeout;
- }
-
- public String getTopic() {
- return topic;
- }
-
- public void setTopic(String topic) {
- this.topic = topic;
- }
-
-}
diff --git a/rocketmq-streams-channel-kafka/src/main/java/org/apache/rocketmq/streams/kafka/source/KafkaSource.java b/rocketmq-streams-channel-kafka/src/main/java/org/apache/rocketmq/streams/kafka/source/KafkaSource.java
deleted file mode 100644
index 7814a89..0000000
--- a/rocketmq-streams-channel-kafka/src/main/java/org/apache/rocketmq/streams/kafka/source/KafkaSource.java
+++ /dev/null
@@ -1,275 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.streams.kafka.source;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-import com.alibaba.fastjson.JSONObject;
-
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.rocketmq.streams.common.channel.source.AbstractSupportShuffleSource;
-import org.apache.rocketmq.streams.common.configurable.annotation.ENVDependence;
-import org.apache.rocketmq.streams.common.context.Message;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.producer.KafkaProducer;
-
-public class KafkaSource extends AbstractSupportShuffleSource {
-
- private static final Log LOG = LogFactory.getLog(KafkaSource.class);
-
- private static final String PREFIX = "dipper.upgrade.channel.kafak.envkey";
-
- private transient List<KafkaConsumer<String, String>> consumers = new ArrayList<>();
- private transient KafkaProducer kafkaProducer;
- private transient Properties props;
-
- private static int maxPollRecords = 100;
- private volatile transient boolean stop = false;
-
- @ENVDependence
- private String endpoint;
- private int sessionTimeout = 30000;
- private transient ExecutorService executorService = null;
- private transient ConcurrentLinkedQueue<ConsumerRecord<String, String>> itemQueue =
- new ConcurrentLinkedQueue<ConsumerRecord<String, String>>();
- //private int QUEUE_MAX_SIZE = 5000000;
-
- public KafkaSource() {
- }
-
- public KafkaSource(String endpoint, String topic, String groupName) {
- this.endpoint = endpoint;
- this.topic = topic;
- this.groupName = groupName;
- }
-
- @Override
- protected boolean initConfigurable() {
- Properties props = new Properties();
- props.put("bootstrap.servers", endpoint);
- props.put("group.id", groupName);//这里是GroupA或者GroupB
- props.put("enable.auto.commit", false);
- props.put("acks", "1"); //针对kafka producer可以做一些参数优化
- props.put("linger.ms", 1);
- props.put("batch.size", 16384);
- props.put("auto.commit.interval.ms", String.valueOf(checkpointTime));
- props.put("session.timeout.ms", String.valueOf(sessionTimeout));
- props.put("max.poll.records", String.valueOf(getMaxFetchLogGroupSize()));
- props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- props.put("auto.offset.reset", "earliest");
- props.put("key.serializer.encoding",getEncoding());
- props.put("value.serializer.encoding",getEncoding());
- this.props = props;
-
- kafkaProducer = new KafkaProducer<String, String>(props);
-
- return true;
- }
-
- @Override
- protected boolean startSource() {
- try {
- //kafka多线程消费会报“KafkaConsumer is not safe for multi-threaded access”,所以改成单线程拉取,多线程处理
- WorkerFunc workerFunc = new WorkerFunc();
- ExecutorService executorService = new ThreadPoolExecutor(getMaxThread(), getMaxThread(),
- 0L, TimeUnit.MILLISECONDS,
- new LinkedBlockingQueue<Runnable>(1000));
- executorService.execute(workerFunc);
- //处理
-
- } catch (Exception e) {
- setInitSuccess(false);
- LOG.error(e.getMessage(), e);
- destroy();
- throw new RuntimeException(" start kafka channel error " + topic);
- }
- return true;
- }
-
- protected class WorkerFunc implements Runnable {
- @Override
- public void run() {
- List<String> topicList = new ArrayList<>();
- topicList.add(topic);
- KafkaConsumer<String,String> consumer = new KafkaConsumer(props);
- consumer.subscribe(topicList);
- long lastUpgrade = System.currentTimeMillis();
- List<PartitionInfo> oldPartitionInfos = consumer.partitionsFor(topic);
- while (true && !stop) {
- try {
-
- ConsumerRecords<String, String> records = consumer.poll(1000);
- List<PartitionInfo> newPartitionInfos = consumer.partitionsFor(topic);
- messageQueueChanged(oldPartitionInfos, newPartitionInfos);
- Set<String> queueIds = new HashSet<>();
- for (ConsumerRecord<String,String> record : records) {
- try {
- // LOG.info(String.format("recordValue=%s", record.value()));
- queueIds.add(record.partition() + "");
- if(getHeaderFieldNames()!=null){
- Map<String,Object> parameters=new HashMap<>();
- parameters.put("messageKey",record.key());
- parameters.put("topic",record.topic());
- parameters.put("partition",record.partition());
- parameters.put("offset",record.offset());
- parameters.put("timestamp",record.timestamp());
- }
- JSONObject msg = create(record.value(),null);
- Message message = createMessage(msg, record.partition() + "", record.offset() + "", false);
- message.getHeader().setOffsetIsLong(true);
- executeMessage(message);
- doReceiveMessage(msg, false, record.partition() + "", record.offset() + "");
- } catch (Exception e) {
- LOG.error(e.getMessage(), e);
- }
- }
- if ((System.currentTimeMillis() - lastUpgrade) > checkpointTime) {
- sendCheckpoint(queueIds);
- consumer.commitAsync();
- lastUpgrade = System.currentTimeMillis();
- }
- } catch (Exception e) {
- LOG.error(e.getMessage(), e);
- }
-
- }
- }
- }
-
- public void messageQueueChanged(List<PartitionInfo> old, List<PartitionInfo> newPartitions) {
- Set<String> queueIds = new HashSet<>();
- Set<String> mqAll = new HashSet<>();
- Set<String> mqDivided = new HashSet<>();
- for (PartitionInfo partitionInfo : old) {
- mqAll.add(partitionInfo.partition() + "");
- }
- for (PartitionInfo partitionInfo : newPartitions) {
- mqDivided.add(partitionInfo.partition() + "");
- }
- for (String messageQueue : mqAll) {
- if (!mqDivided.contains(messageQueue)) {
- //ProcessQueue pq = this.processQueueTable.remove(messageQueue);
- //if (pq != null) {
- // pq.setDropped(true);
- // log.info("doRebalance, {}, truncateMessageQueueNotMyTopic remove unnecessary mq, {}", consumerGroup, messageQueue);
- //}
- queueIds.add(messageQueue);
- }
- }
- Set<String> newQueueIds = new HashSet<>();
- for (String messageQueue : mqDivided) {
- if (!mqAll.contains(messageQueue)) {
- newQueueIds.add(messageQueue + "");
- }
- }
- removeSplit(queueIds);
- addNewSplit(newQueueIds);
-
- }
-
- @Override
- public boolean supportNewSplitFind() {
- return true;
- }
-
- @Override
- public boolean supportRemoveSplitFind() {
- return true;
- }
-
- @Override
- public boolean supportOffsetRest() {
- return false;
- }
-
- @Override
- protected boolean isNotDataSplit(String queueId) {
- return false;
- }
-
- protected void destroyConsumer() {
- if (consumers != null && consumers.size() > 0) {
- for (KafkaConsumer consumer : consumers) {
- if (consumer != null) {
- consumer.close();
- }
- }
-
- }
- }
-
- @Override
- public void destroy() {
- super.destroy();
- stop = true;
- destroyConsumer();
- }
-
- public static int getMaxPollRecords() {
- return maxPollRecords;
- }
-
- public boolean isStop() {
- return stop;
- }
-
- public void setStop(boolean stop) {
- this.stop = stop;
- }
-
- public String getEndpoint() {
- return endpoint;
- }
-
- public void setEndpoint(String endpoint) {
- this.endpoint = endpoint;
- }
-
- @Override
- public void setMaxFetchLogGroupSize(int size) {
- super.setMaxFetchLogGroupSize(size);
- maxPollRecords = size;
- }
-
- public int getSessionTimeout() {
- return sessionTimeout;
- }
-
- public void setSessionTimeout(int sessionTimeout) {
- this.sessionTimeout = sessionTimeout;
- }
-
-}
diff --git a/rocketmq-streams-channel-kafka/src/test/java/org/apache/rocketmq/streams/kafka/KafkaChannelTest.java b/rocketmq-streams-channel-kafka/src/test/java/org/apache/rocketmq/streams/kafka/KafkaChannelTest.java
deleted file mode 100644
index cfaa1a6..0000000
--- a/rocketmq-streams-channel-kafka/src/test/java/org/apache/rocketmq/streams/kafka/KafkaChannelTest.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.streams.kafka;
-
-import com.alibaba.fastjson.JSONObject;
-
-import org.apache.rocketmq.streams.common.channel.sink.ISink;
-import org.apache.rocketmq.streams.common.channel.source.ISource;
-import org.apache.rocketmq.streams.common.context.Message;
-import org.apache.rocketmq.streams.kafka.sink.KafkaSink;
-import org.apache.rocketmq.streams.kafka.source.KafkaSource;
-import org.junit.Test;
-
-/**
- * 本机搭建kafka服务端参考:https://www.cnblogs.com/BlueSkyyj/p/11425998.html
- **/
-public class KafkaChannelTest {
- private static final String END_POINT = "47.94.238.133:31013";
- private static final String TOPIC = "real_time_kafka_topic";
- //数据任务处理后topic:es_index_test,实时策略消息topic:real_time_kafka_topic
- private static final String GROUP_NAME = "test-090";
-
- @Test
- public void testKafkaReceive() throws InterruptedException {
- ISource channel = createSource();
- channel.start((message, context) -> {
- System.out.println(message.getMessageBody());
- return message;
- });
- while (true) {
- Thread.sleep(1000L);
- }
- }
-
- @Test
- public void testSendMessageToKafka() throws InterruptedException {
- ISink channel = createSink();
- String data = "{\"destIp\":\"10.252.30.235\",\"protocol\":\"tcp\",\"destPort\":\"10000\",\"sourceIp\":\"10"
- + ".232.1.1\",\"level\":\"high\",\"srcPort\":\"8000\",\"monitor_type\":\"auto\"}";
- //
- String alarmData = "{\"destIp\":\"10.252.30.235\",\"fixMethod\":\"测试策略,不用处理\",\"level\":\"urgent\","
- + "\"modelId\":\"666\",\"alarmTime\":\"2020-03-12 19:35:31\",\"alarmName\":\"完整流程测试策略名称\","
- + "\"srcPort\":\"8000\",\"source\":\"完整数据流串测-es通道-不要删除\",\"protocol\":\"tcp\",\"monitorType\":\"hand\","
- + "\"destPort\":\"10000\",\"sourceIp\":\"10.232.1.1\","
- + "\"fireRules\":[{\"ruleName\":\"pipeline_test_config_namestrategy_filter_stage\","
- + "\"ruleNameSpace\":\"paas_soc_namespace\"}],\"modelDesc\":\"测试策略,不要在意\",\"detail\":{\"destIp\":\"10.252"
- + ".30.235\",\"fixMethod\":\"测试策略,不用处理\",\"level\":\"urgent\",\"modelId\":\"666\","
- + "\"alarmTime\":\"2020-03-12 19:35:31\",\"alarmName\":\"完整流程测试策略名称\",\"srcPort\":\"8000\","
- + "\"source\":\"完整数据流串测-es通道-不要删除\",\"protocol\":\"tcp\",\"monitorType\":\"hand\",\"destPort\":\"10000\","
- + "\"sourceIp\":\"10.232.1.1\","
- + "\"fireRules\":[{\"ruleName\":\"pipeline_test_config_namestrategy_filter_stage\","
- + "\"ruleNameSpace\":\"paas_soc_namespace\"}],\"modelDesc\":\"测试策略,不要在意\",\"monitor_type\":\"auto\"},"
- + "\"monitor_type\":\"auto\"}";
- //for (int i = 0; i < 1; i++) {
- // JSONObject message = new JSONObject();
- // message.put("key", "message_key " + i);
- // message.put("value", 999 + i);
- // channel.batchAdd(message);
- //}
- channel.batchAdd(new Message(JSONObject.parseObject(alarmData)));
- channel.flush();
- /**
- * 注意:发送之后不能立即退出应用,不然消息可能还没有发出,会发生消息丢失
- */
- Thread.sleep(1000L);
- }
-
- private ISink createSink() {
- KafkaSink kafkaChannel = new KafkaSink();
- kafkaChannel.setTopic(TOPIC);
- kafkaChannel.setEndpoint(END_POINT);
- kafkaChannel.setNameSpace("com.aliyun.dipper.test");
- kafkaChannel.setConfigureName("kafka_channel");
- kafkaChannel.init();
- return kafkaChannel;
- }
-
- private ISource createSource() {
- KafkaSource kafkaChannel = new KafkaSource();
- kafkaChannel.setJsonData(false);
- kafkaChannel.setTopic(TOPIC);
- kafkaChannel.setEndpoint(END_POINT);
- kafkaChannel.setGroupName(GROUP_NAME);
- kafkaChannel.setMaxThread(1);
- kafkaChannel.setNameSpace("com.aliyun.dipper.test");
- kafkaChannel.setConfigureName("kafka_channel");
- kafkaChannel.init();
- return kafkaChannel;
- }
-}
diff --git a/rocketmq-streams-channel-kafka/src/test/resources/log4j.xml b/rocketmq-streams-channel-kafka/src/test/resources/log4j.xml
deleted file mode 100755
index ce34a62..0000000
--- a/rocketmq-streams-channel-kafka/src/test/resources/log4j.xml
+++ /dev/null
@@ -1,36 +0,0 @@
-<?xml version="1.0" encoding="UTF-8" ?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- -->
-<!DOCTYPE log4j:configuration SYSTEM "http://toolkit.alibaba-inc.com/dtd/log4j/log4j.dtd">
-<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
-
- <appender name="Console" class="org.apache.log4j.ConsoleAppender">
- <layout class="org.apache.log4j.PatternLayout">
- <param name="ConversionPattern" value="%d{ISO8601} %l [%t] %-5p - %m%n%n"/>
- </layout>
- <filter class="org.apache.log4j.varia.LevelRangeFilter">
- <param name="LevelMin" value="INFO"/>
- <param name="LevelMax" value="ERROR"/>
- </filter>
- </appender>
-
- <root>
- <priority value="INFO"/>
- <appender-ref ref="Console"/>
- </root>
-
-</log4j:configuration>
\ No newline at end of file