You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2021/12/07 01:46:12 UTC

[rocketmq-streams] branch main updated: remove useless test

This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-streams.git


The following commit(s) were added to refs/heads/main by this push:
     new 74517f0  remove useless test
74517f0 is described below

commit 74517f05257e72f965862a170388e63f7e290aa2
Author: duhenglucky <du...@apache.org>
AuthorDate: Tue Dec 7 09:45:42 2021 +0800

    remove useless test
---
 pom.xml                                            |   6 -
 rocketmq-streams-channel-kafka/pom.xml             |  48 ----
 .../apache/rocketmq/streams/kafka/KafkaSplit.java  |  55 -----
 .../streams/kafka/KakfaChannelBuilder.java         |  89 -------
 .../rocketmq/streams/kafka/sink/KafkaSink.java     | 226 -----------------
 .../rocketmq/streams/kafka/source/KafkaSource.java | 275 ---------------------
 .../rocketmq/streams/kafka/KafkaChannelTest.java   | 104 --------
 .../src/test/resources/log4j.xml                   |  36 ---
 8 files changed, 839 deletions(-)

diff --git a/pom.xml b/pom.xml
index 93fbda0..55934bc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -57,7 +57,6 @@
         <module>rocketmq-streams-channel-syslog</module>
         <module>rocketmq-streams-channel-es</module>
         <module>rocketmq-streams-runner</module>
-        <module>rocketmq-streams-channel-kafka</module>
     </modules>
 
     <properties>
@@ -312,11 +311,6 @@
             </dependency>
             <dependency>
                 <groupId>org.apache.rocketmq</groupId>
-                <artifactId>rocketmq-streams-channel-kafka</artifactId>
-                <version>${project.version}</version>
-            </dependency>
-            <dependency>
-                <groupId>org.apache.rocketmq</groupId>
                 <artifactId>rocketmq-streams-connectors</artifactId>
                 <version>${project.version}</version>
             </dependency>
diff --git a/rocketmq-streams-channel-kafka/pom.xml b/rocketmq-streams-channel-kafka/pom.xml
deleted file mode 100644
index 309170d..0000000
--- a/rocketmq-streams-channel-kafka/pom.xml
+++ /dev/null
@@ -1,48 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  Licensed to the Apache Software Foundation (ASF) under one or more
-  contributor license agreements.  See the NOTICE file distributed with
-  this work for additional information regarding copyright ownership.
-  The ASF licenses this file to You under the Apache License, Version 2.0
-  (the "License"); you may not use this file except in compliance with
-  the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
-  -->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-    <parent>
-        <artifactId>rocketmq-streams</artifactId>
-        <groupId>org.apache.rocketmq</groupId>
-        <version>1.0.0-SNAPSHOT</version>
-    </parent>
-    <modelVersion>4.0.0</modelVersion>
-
-    <artifactId>rocketmq-streams-channel-kafka</artifactId>
-    <name>ROCKETMQ STREAMS :: channel-kafka</name>
-
-    <properties>
-        <maven.compiler.source>8</maven.compiler.source>
-        <maven.compiler.target>8</maven.compiler.target>
-    </properties>
-
-
-    <dependencies>
-        <dependency>
-            <groupId>org.apache.kafka</groupId>
-            <artifactId>kafka_2.12</artifactId>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.rocketmq</groupId>
-            <artifactId>rocketmq-streams-commons</artifactId>
-        </dependency>
-    </dependencies>
-</project>
\ No newline at end of file
diff --git a/rocketmq-streams-channel-kafka/src/main/java/org/apache/rocketmq/streams/kafka/KafkaSplit.java b/rocketmq-streams-channel-kafka/src/main/java/org/apache/rocketmq/streams/kafka/KafkaSplit.java
deleted file mode 100644
index 20fa396..0000000
--- a/rocketmq-streams-channel-kafka/src/main/java/org/apache/rocketmq/streams/kafka/KafkaSplit.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.streams.kafka;
-
-import com.alibaba.fastjson.JSONObject;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.rocketmq.streams.common.channel.split.ISplit;
-import org.apache.rocketmq.streams.common.configurable.BasedConfigurable;
-
-public class KafkaSplit extends BasedConfigurable implements ISplit<KafkaSplit, PartitionInfo> {
-    protected PartitionInfo partitionInfo;
-    protected String topic;
-    protected int partition;
-
-    @Override
-    public String getQueueId() {
-        return partition + "";
-    }
-
-
-    @Override
-    public PartitionInfo getQueue() {
-        return partitionInfo;
-    }
-
-    @Override
-    public int compareTo(KafkaSplit o) {
-        return partition - o.partition;
-    }
-
-    @Override
-    protected void getJsonObject(JSONObject jsonObject) {
-        super.getJsonObject(jsonObject);
-        this.partitionInfo = new PartitionInfo(topic, partition, null, null, null);
-    }
-
-    public KafkaSplit(PartitionInfo partitionInfo) {
-        this.partitionInfo = partitionInfo;
-        this.partition = partitionInfo.partition();
-    }
-}
diff --git a/rocketmq-streams-channel-kafka/src/main/java/org/apache/rocketmq/streams/kafka/KakfaChannelBuilder.java b/rocketmq-streams-channel-kafka/src/main/java/org/apache/rocketmq/streams/kafka/KakfaChannelBuilder.java
deleted file mode 100644
index 2d947c5..0000000
--- a/rocketmq-streams-channel-kafka/src/main/java/org/apache/rocketmq/streams/kafka/KakfaChannelBuilder.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.streams.kafka;
-
-import com.alibaba.fastjson.JSONObject;
-import com.google.auto.service.AutoService;
-import java.util.Properties;
-import org.apache.rocketmq.streams.common.channel.builder.AbstractSupportShuffleChannelBuilder;
-import org.apache.rocketmq.streams.common.channel.builder.IChannelBuilder;
-import org.apache.rocketmq.streams.common.channel.sink.ISink;
-import org.apache.rocketmq.streams.common.channel.source.ISource;
-import org.apache.rocketmq.streams.common.metadata.MetaData;
-import org.apache.rocketmq.streams.common.model.ServiceName;
-import org.apache.rocketmq.streams.common.utils.ConfigurableUtil;
-import org.apache.rocketmq.streams.kafka.sink.KafkaSink;
-import org.apache.rocketmq.streams.kafka.source.KafkaSource;
-
-@AutoService(IChannelBuilder.class)
-@ServiceName(value = KakfaChannelBuilder.TYPE, aliasName = "KafkaSource")
-public class KakfaChannelBuilder extends AbstractSupportShuffleChannelBuilder {
-    public static final String TYPE = "kakfa";
-    /**
-     * @param namespace
-     * @param name
-     * @param properties
-     * @return
-     */
-    @Override
-    public ISource createSource(String namespace, String name, Properties properties, MetaData metaData) {
-
-        KafkaSource kafkaSource = (KafkaSource) ConfigurableUtil.create(KafkaSource.class.getName(), namespace, name, createFormatProperty(properties), null);
-        return kafkaSource;
-    }
-
-    @Override
-    public String getType() {
-        return TYPE;
-    }
-
-    @Override
-    public ISink createSink(String namespace, String name, Properties properties, MetaData metaData) {
-        KafkaSink kafkaSink = (KafkaSink)ConfigurableUtil.create(KafkaSink.class.getName(), namespace, name, createFormatProperty(properties), null);
-        return kafkaSink;
-    }
-
-    /**
-     * 创建标准的属性文件
-     *
-     * @param properties
-     * @return
-     */
-    protected JSONObject createFormatProperty(Properties properties) {
-        JSONObject formatProperties = new JSONObject();
-        for (Object object : properties.keySet()) {
-            String key = (String)object;
-            if ("type".equals(key)) {
-                continue;
-            }
-            formatProperties.put(key, properties.getProperty(key));
-        }
-        IChannelBuilder.formatPropertiesName(formatProperties, properties, "groupName", "group.id");
-        IChannelBuilder.formatPropertiesName(formatProperties, properties, "groupName", "consumerGroup");
-        IChannelBuilder.formatPropertiesName(formatProperties, properties, "endpoint", "bootstrap.servers");
-        IChannelBuilder.formatPropertiesName(formatProperties, properties, "maxThread", "thread.max.count");
-        return formatProperties;
-    }
-
-    @Override
-    public ISink createBySource(ISource pipelineSource) {
-        KafkaSource kafkaSource = (KafkaSource)pipelineSource;
-        String topic = kafkaSource.getTopic();
-        KafkaSink sink = new KafkaSink(kafkaSource.getEndpoint(),topic);
-        return sink;
-    }
-}
diff --git a/rocketmq-streams-channel-kafka/src/main/java/org/apache/rocketmq/streams/kafka/sink/KafkaSink.java b/rocketmq-streams-channel-kafka/src/main/java/org/apache/rocketmq/streams/kafka/sink/KafkaSink.java
deleted file mode 100644
index 3ae0967..0000000
--- a/rocketmq-streams-channel-kafka/src/main/java/org/apache/rocketmq/streams/kafka/sink/KafkaSink.java
+++ /dev/null
@@ -1,226 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.streams.kafka.sink;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.kafka.clients.CommonClientConfigs;
-import org.apache.kafka.clients.admin.AdminClient;
-import org.apache.kafka.clients.admin.ListTopicsResult;
-import org.apache.kafka.clients.admin.NewTopic;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.producer.Callback;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.rocketmq.streams.common.channel.sink.AbstractSupportShuffleSink;
-import org.apache.rocketmq.streams.common.channel.split.ISplit;
-import org.apache.rocketmq.streams.common.configurable.annotation.ENVDependence;
-import org.apache.rocketmq.streams.common.context.IMessage;
-import org.apache.rocketmq.streams.kafka.KafkaSplit;
-import org.apache.rocketmq.streams.kafka.source.KafkaSource;
-
-import java.util.*;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ExecutorService;
-
-public class KafkaSink extends AbstractSupportShuffleSink {
-    private static final Log LOG = LogFactory.getLog(KafkaSource.class);
-
-    private static final String PREFIX = "dipper.upgrade.channel.kafak.envkey";
-
-    private transient List<KafkaConsumer<String, String>> consumers = new ArrayList<>();
-    private transient KafkaProducer kafkaProducer;
-    private transient Properties props;
-
-    private static int maxPollRecords = 100;
-    private volatile transient boolean stop = false;
-    protected String topic;
-    @ENVDependence
-    private String endpoint;
-    private int sessionTimeout = 30000;
-    private transient ExecutorService executorService = null;
-    private transient ConcurrentLinkedQueue<ConsumerRecord<String, String>> itemQueue = new ConcurrentLinkedQueue<ConsumerRecord<String, String>>();
-
-    public KafkaSink() {}
-
-    public KafkaSink(String endpoint, String topic) {
-        this.endpoint = endpoint;
-        this.topic = topic;
-    }
-
-    @Override
-    protected boolean initConfigurable() {
-        Properties props = new Properties();
-        props.put("bootstrap.servers", endpoint);
-        props.put("enable.auto.commit", false);
-        props.put("acks", "1");  //针对kafka producer可以做一些参数优化
-        props.put("linger.ms", 1);
-        props.put("batch.size", 16384);
-        props.put("session.timeout.ms", String.valueOf(sessionTimeout));
-        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
-        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
-        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
-        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
-        this.props = props;
-
-        kafkaProducer = new KafkaProducer<String, String>(props);
-
-        return true;
-    }
-
-    @Override
-    protected boolean batchInsert(List<IMessage> messages) {
-        if (messages == null) {
-            return true;
-        }
-        for (IMessage message : messages) {
-            putMessage2Mq(message);
-        }
-        return true;
-    }
-
-    protected void destroyProducer() {
-        if (kafkaProducer != null) {
-            try {
-                kafkaProducer.close();
-            } catch (Throwable t) {
-                if (LOG.isWarnEnabled()) {
-                    LOG.warn(t.getMessage(), t);
-                }
-            }
-        }
-    }
-
-    @Override
-    public void destroy() {
-        super.destroy();
-        stop = true;
-        destroyProducer();
-    }
-
-    @Override
-    protected void createTopicIfNotExist(int splitNum) {
-        AdminClient adminClient = null;
-        try {
-
-            Properties properties = new Properties();
-            properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
-                endpoint);
-
-            adminClient = AdminClient.create(properties);
-            ListTopicsResult result = adminClient.listTopics();
-            Set<String> topics = result.names().get();
-            if (topics.contains(topic)) {
-                return;
-            }
-            NewTopic newTopic = new NewTopic(topic, splitNum, (short)1);
-            adminClient.createTopics(Arrays.asList(newTopic));
-
-            LOG.info("创建主题成功:" + topic);
-        } catch (Exception ex) {
-            ex.printStackTrace();
-        } finally {
-            if (adminClient != null) {
-                adminClient.close();
-            }
-        }
-    }
-
-    @Override
-    public String getShuffleTopicFieldName() {
-        return "topic";
-    }
-
-    @Override
-    public List<ISplit> getSplitList() {
-        List<PartitionInfo> partitionInfos = kafkaProducer.partitionsFor(topic);
-        List<ISplit> splits = new ArrayList<>();
-        for (PartitionInfo partitionInfo : partitionInfos) {
-            splits.add(new KafkaSplit(partitionInfo));
-        }
-        return splits;
-    }
-
-    protected boolean putMessage2Mq(IMessage fieldName2Value) {
-        try {
-
-            LOG.info(String.format("topic=%s, record=%s", topic, fieldName2Value.getMessageValue().toString()));
-            ProducerRecord<String, String> records =
-                new ProducerRecord<String, String>(topic, fieldName2Value.getMessageValue().toString());
-            kafkaProducer.send(records, new Callback() {
-
-                @Override
-                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
-                    if (e != null) {
-                        //                    	LOG.error("send kafka message error!topic=" + topic, e);
-                    } else {
-                        //                    	LOG.info(String.format("send success topic=%s, record=%s", topic, jsonObject.toJSONString()));
-                    }
-                }
-            });
-        } catch (Exception e) {
-            LOG.error("send message error:" + fieldName2Value.getMessageValue().toString(), e);
-            return false;
-        }
-        return true;
-    }
-
-    @Override
-    public int getSplitNum() {
-        return getSplitList().size();
-    }
-
-    public static int getMaxPollRecords() {
-        return maxPollRecords;
-    }
-
-    public boolean isStop() {
-        return stop;
-    }
-
-    public void setStop(boolean stop) {
-        this.stop = stop;
-    }
-
-    public String getEndpoint() {
-        return endpoint;
-    }
-
-    public void setEndpoint(String endpoint) {
-        this.endpoint = endpoint;
-    }
-
-    public int getSessionTimeout() {
-        return sessionTimeout;
-    }
-
-    public void setSessionTimeout(int sessionTimeout) {
-        this.sessionTimeout = sessionTimeout;
-    }
-
-    public String getTopic() {
-        return topic;
-    }
-
-    public void setTopic(String topic) {
-        this.topic = topic;
-    }
-
-}
diff --git a/rocketmq-streams-channel-kafka/src/main/java/org/apache/rocketmq/streams/kafka/source/KafkaSource.java b/rocketmq-streams-channel-kafka/src/main/java/org/apache/rocketmq/streams/kafka/source/KafkaSource.java
deleted file mode 100644
index 7814a89..0000000
--- a/rocketmq-streams-channel-kafka/src/main/java/org/apache/rocketmq/streams/kafka/source/KafkaSource.java
+++ /dev/null
@@ -1,275 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.streams.kafka.source;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-import com.alibaba.fastjson.JSONObject;
-
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.rocketmq.streams.common.channel.source.AbstractSupportShuffleSource;
-import org.apache.rocketmq.streams.common.configurable.annotation.ENVDependence;
-import org.apache.rocketmq.streams.common.context.Message;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.producer.KafkaProducer;
-
-public class KafkaSource extends AbstractSupportShuffleSource {
-
-    private static final Log LOG = LogFactory.getLog(KafkaSource.class);
-
-    private static final String PREFIX = "dipper.upgrade.channel.kafak.envkey";
-
-    private transient List<KafkaConsumer<String, String>> consumers = new ArrayList<>();
-    private transient KafkaProducer kafkaProducer;
-    private transient Properties props;
-
-    private static int maxPollRecords = 100;
-    private volatile transient boolean stop = false;
-
-    @ENVDependence
-    private String endpoint;
-    private int sessionTimeout = 30000;
-    private transient ExecutorService executorService = null;
-    private transient ConcurrentLinkedQueue<ConsumerRecord<String, String>> itemQueue =
-        new ConcurrentLinkedQueue<ConsumerRecord<String, String>>();
-    //private int QUEUE_MAX_SIZE = 5000000;
-
-    public KafkaSource() {
-    }
-
-    public KafkaSource(String endpoint, String topic, String groupName) {
-        this.endpoint = endpoint;
-        this.topic = topic;
-        this.groupName = groupName;
-    }
-
-    @Override
-    protected boolean initConfigurable() {
-        Properties props = new Properties();
-        props.put("bootstrap.servers", endpoint);
-        props.put("group.id", groupName);//这里是GroupA或者GroupB
-        props.put("enable.auto.commit", false);
-        props.put("acks", "1");  //针对kafka producer可以做一些参数优化
-        props.put("linger.ms", 1);
-        props.put("batch.size", 16384);
-        props.put("auto.commit.interval.ms", String.valueOf(checkpointTime));
-        props.put("session.timeout.ms", String.valueOf(sessionTimeout));
-        props.put("max.poll.records", String.valueOf(getMaxFetchLogGroupSize()));
-        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
-        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
-        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
-        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
-        props.put("auto.offset.reset", "earliest");
-        props.put("key.serializer.encoding",getEncoding());
-        props.put("value.serializer.encoding",getEncoding());
-        this.props = props;
-
-        kafkaProducer = new KafkaProducer<String, String>(props);
-
-        return true;
-    }
-
-    @Override
-    protected boolean startSource() {
-        try {
-            //kafka多线程消费会报“KafkaConsumer is not safe for multi-threaded access”,所以改成单线程拉取,多线程处理
-            WorkerFunc workerFunc = new WorkerFunc();
-            ExecutorService executorService = new ThreadPoolExecutor(getMaxThread(), getMaxThread(),
-                0L, TimeUnit.MILLISECONDS,
-                new LinkedBlockingQueue<Runnable>(1000));
-            executorService.execute(workerFunc);
-            //处理
-
-        } catch (Exception e) {
-            setInitSuccess(false);
-            LOG.error(e.getMessage(), e);
-            destroy();
-            throw new RuntimeException(" start kafka channel error " + topic);
-        }
-        return true;
-    }
-
-    protected class WorkerFunc implements Runnable {
-        @Override
-        public void run() {
-            List<String> topicList = new ArrayList<>();
-            topicList.add(topic);
-            KafkaConsumer<String,String> consumer = new KafkaConsumer(props);
-            consumer.subscribe(topicList);
-            long lastUpgrade = System.currentTimeMillis();
-            List<PartitionInfo> oldPartitionInfos = consumer.partitionsFor(topic);
-            while (true && !stop) {
-                try {
-
-                    ConsumerRecords<String, String> records = consumer.poll(1000);
-                    List<PartitionInfo> newPartitionInfos = consumer.partitionsFor(topic);
-                    messageQueueChanged(oldPartitionInfos, newPartitionInfos);
-                    Set<String> queueIds = new HashSet<>();
-                    for (ConsumerRecord<String,String> record : records) {
-                        try {
-                            //		                    LOG.info(String.format("recordValue=%s", record.value()));
-                            queueIds.add(record.partition() + "");
-                            if(getHeaderFieldNames()!=null){
-                                Map<String,Object> parameters=new HashMap<>();
-                                parameters.put("messageKey",record.key());
-                                parameters.put("topic",record.topic());
-                                parameters.put("partition",record.partition());
-                                parameters.put("offset",record.offset());
-                                parameters.put("timestamp",record.timestamp());
-                            }
-                            JSONObject msg = create(record.value(),null);
-                            Message message = createMessage(msg, record.partition() + "", record.offset() + "", false);
-                            message.getHeader().setOffsetIsLong(true);
-                            executeMessage(message);
-                            doReceiveMessage(msg, false, record.partition() + "", record.offset() + "");
-                        } catch (Exception e) {
-                            LOG.error(e.getMessage(), e);
-                        }
-                    }
-                    if ((System.currentTimeMillis() - lastUpgrade) > checkpointTime) {
-                        sendCheckpoint(queueIds);
-                        consumer.commitAsync();
-                        lastUpgrade = System.currentTimeMillis();
-                    }
-                } catch (Exception e) {
-                    LOG.error(e.getMessage(), e);
-                }
-
-            }
-        }
-    }
-
-    public void messageQueueChanged(List<PartitionInfo> old, List<PartitionInfo> newPartitions) {
-        Set<String> queueIds = new HashSet<>();
-        Set<String> mqAll = new HashSet<>();
-        Set<String> mqDivided = new HashSet<>();
-        for (PartitionInfo partitionInfo : old) {
-            mqAll.add(partitionInfo.partition() + "");
-        }
-        for (PartitionInfo partitionInfo : newPartitions) {
-            mqDivided.add(partitionInfo.partition() + "");
-        }
-        for (String messageQueue : mqAll) {
-            if (!mqDivided.contains(messageQueue)) {
-                //ProcessQueue pq = this.processQueueTable.remove(messageQueue);
-                //if (pq != null) {
-                //    pq.setDropped(true);
-                //    log.info("doRebalance, {}, truncateMessageQueueNotMyTopic remove unnecessary mq, {}", consumerGroup, messageQueue);
-                //}
-                queueIds.add(messageQueue);
-            }
-        }
-        Set<String> newQueueIds = new HashSet<>();
-        for (String messageQueue : mqDivided) {
-            if (!mqAll.contains(messageQueue)) {
-                newQueueIds.add(messageQueue + "");
-            }
-        }
-        removeSplit(queueIds);
-        addNewSplit(newQueueIds);
-
-    }
-
-    @Override
-    public boolean supportNewSplitFind() {
-        return true;
-    }
-
-    @Override
-    public boolean supportRemoveSplitFind() {
-        return true;
-    }
-
-    @Override
-    public boolean supportOffsetRest() {
-        return false;
-    }
-
-    @Override
-    protected boolean isNotDataSplit(String queueId) {
-        return false;
-    }
-
-    protected void destroyConsumer() {
-        if (consumers != null && consumers.size() > 0) {
-            for (KafkaConsumer consumer : consumers) {
-                if (consumer != null) {
-                    consumer.close();
-                }
-            }
-
-        }
-    }
-
-    @Override
-    public void destroy() {
-        super.destroy();
-        stop = true;
-        destroyConsumer();
-    }
-
-    public static int getMaxPollRecords() {
-        return maxPollRecords;
-    }
-
-    public boolean isStop() {
-        return stop;
-    }
-
-    public void setStop(boolean stop) {
-        this.stop = stop;
-    }
-
-    public String getEndpoint() {
-        return endpoint;
-    }
-
-    public void setEndpoint(String endpoint) {
-        this.endpoint = endpoint;
-    }
-
-    @Override
-    public void setMaxFetchLogGroupSize(int size) {
-        super.setMaxFetchLogGroupSize(size);
-        maxPollRecords = size;
-    }
-
-    public int getSessionTimeout() {
-        return sessionTimeout;
-    }
-
-    public void setSessionTimeout(int sessionTimeout) {
-        this.sessionTimeout = sessionTimeout;
-    }
-
-}
diff --git a/rocketmq-streams-channel-kafka/src/test/java/org/apache/rocketmq/streams/kafka/KafkaChannelTest.java b/rocketmq-streams-channel-kafka/src/test/java/org/apache/rocketmq/streams/kafka/KafkaChannelTest.java
deleted file mode 100644
index cfaa1a6..0000000
--- a/rocketmq-streams-channel-kafka/src/test/java/org/apache/rocketmq/streams/kafka/KafkaChannelTest.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.streams.kafka;
-
-import com.alibaba.fastjson.JSONObject;
-
-import org.apache.rocketmq.streams.common.channel.sink.ISink;
-import org.apache.rocketmq.streams.common.channel.source.ISource;
-import org.apache.rocketmq.streams.common.context.Message;
-import org.apache.rocketmq.streams.kafka.sink.KafkaSink;
-import org.apache.rocketmq.streams.kafka.source.KafkaSource;
-import org.junit.Test;
-
-/**
- * 本机搭建kafka服务端参考:https://www.cnblogs.com/BlueSkyyj/p/11425998.html
- **/
-public class KafkaChannelTest {
-    private static final String END_POINT = "47.94.238.133:31013";
-    private static final String TOPIC = "real_time_kafka_topic";
-    //数据任务处理后topic:es_index_test,实时策略消息topic:real_time_kafka_topic
-    private static final String GROUP_NAME = "test-090";
-
-    @Test
-    public void testKafkaReceive() throws InterruptedException {
-        ISource channel = createSource();
-        channel.start((message, context) -> {
-            System.out.println(message.getMessageBody());
-            return message;
-        });
-        while (true) {
-            Thread.sleep(1000L);
-        }
-    }
-
-    @Test
-    public void testSendMessageToKafka() throws InterruptedException {
-        ISink channel = createSink();
-        String data = "{\"destIp\":\"10.252.30.235\",\"protocol\":\"tcp\",\"destPort\":\"10000\",\"sourceIp\":\"10"
-            + ".232.1.1\",\"level\":\"high\",\"srcPort\":\"8000\",\"monitor_type\":\"auto\"}";
-        //
-        String alarmData = "{\"destIp\":\"10.252.30.235\",\"fixMethod\":\"测试策略,不用处理\",\"level\":\"urgent\","
-            + "\"modelId\":\"666\",\"alarmTime\":\"2020-03-12 19:35:31\",\"alarmName\":\"完整流程测试策略名称\","
-            + "\"srcPort\":\"8000\",\"source\":\"完整数据流串测-es通道-不要删除\",\"protocol\":\"tcp\",\"monitorType\":\"hand\","
-            + "\"destPort\":\"10000\",\"sourceIp\":\"10.232.1.1\","
-            + "\"fireRules\":[{\"ruleName\":\"pipeline_test_config_namestrategy_filter_stage\","
-            + "\"ruleNameSpace\":\"paas_soc_namespace\"}],\"modelDesc\":\"测试策略,不要在意\",\"detail\":{\"destIp\":\"10.252"
-            + ".30.235\",\"fixMethod\":\"测试策略,不用处理\",\"level\":\"urgent\",\"modelId\":\"666\","
-            + "\"alarmTime\":\"2020-03-12 19:35:31\",\"alarmName\":\"完整流程测试策略名称\",\"srcPort\":\"8000\","
-            + "\"source\":\"完整数据流串测-es通道-不要删除\",\"protocol\":\"tcp\",\"monitorType\":\"hand\",\"destPort\":\"10000\","
-            + "\"sourceIp\":\"10.232.1.1\","
-            + "\"fireRules\":[{\"ruleName\":\"pipeline_test_config_namestrategy_filter_stage\","
-            + "\"ruleNameSpace\":\"paas_soc_namespace\"}],\"modelDesc\":\"测试策略,不要在意\",\"monitor_type\":\"auto\"},"
-            + "\"monitor_type\":\"auto\"}";
-        //for (int i = 0; i < 1; i++) {
-        //    JSONObject message = new JSONObject();
-        //    message.put("key", "message_key " + i);
-        //    message.put("value", 999 + i);
-        //    channel.batchAdd(message);
-        //}
-        channel.batchAdd(new Message(JSONObject.parseObject(alarmData)));
-        channel.flush();
-        /**
-         * 注意:发送之后不能立即退出应用,不然消息可能还没有发出,会发生消息丢失
-         */
-        Thread.sleep(1000L);
-    }
-
-    private ISink createSink() {
-        KafkaSink kafkaChannel = new KafkaSink();
-        kafkaChannel.setTopic(TOPIC);
-        kafkaChannel.setEndpoint(END_POINT);
-        kafkaChannel.setNameSpace("com.aliyun.dipper.test");
-        kafkaChannel.setConfigureName("kafka_channel");
-        kafkaChannel.init();
-        return kafkaChannel;
-    }
-
-    private ISource createSource() {
-        KafkaSource kafkaChannel = new KafkaSource();
-        kafkaChannel.setJsonData(false);
-        kafkaChannel.setTopic(TOPIC);
-        kafkaChannel.setEndpoint(END_POINT);
-        kafkaChannel.setGroupName(GROUP_NAME);
-        kafkaChannel.setMaxThread(1);
-        kafkaChannel.setNameSpace("com.aliyun.dipper.test");
-        kafkaChannel.setConfigureName("kafka_channel");
-        kafkaChannel.init();
-        return kafkaChannel;
-    }
-}
diff --git a/rocketmq-streams-channel-kafka/src/test/resources/log4j.xml b/rocketmq-streams-channel-kafka/src/test/resources/log4j.xml
deleted file mode 100755
index ce34a62..0000000
--- a/rocketmq-streams-channel-kafka/src/test/resources/log4j.xml
+++ /dev/null
@@ -1,36 +0,0 @@
-<?xml version="1.0" encoding="UTF-8" ?>
-<!--
-  Licensed to the Apache Software Foundation (ASF) under one or more
-  contributor license agreements.  See the NOTICE file distributed with
-  this work for additional information regarding copyright ownership.
-  The ASF licenses this file to You under the Apache License, Version 2.0
-  (the "License"); you may not use this file except in compliance with
-  the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
-  -->
-<!DOCTYPE log4j:configuration SYSTEM "http://toolkit.alibaba-inc.com/dtd/log4j/log4j.dtd">
-<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
-
-    <appender name="Console" class="org.apache.log4j.ConsoleAppender">
-        <layout class="org.apache.log4j.PatternLayout">
-            <param name="ConversionPattern" value="%d{ISO8601} %l [%t] %-5p - %m%n%n"/>
-        </layout>
-        <filter class="org.apache.log4j.varia.LevelRangeFilter">
-            <param name="LevelMin" value="INFO"/>
-            <param name="LevelMax" value="ERROR"/>
-        </filter>
-    </appender>
-
-    <root>
-        <priority value="INFO"/>
-        <appender-ref ref="Console"/>
-    </root>
-
-</log4j:configuration>
\ No newline at end of file