You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by 731635762 <mr...@qq.com> on 2018/11/20 01:53:38 UTC

storm-kafka-client Repeated consumption last message

Hi All, I use storm+kafka+protobuf to build my stream process system. 

The problem is KafkaTridentSpoutOpaque Repeated consumption the last message. I really want  just once consumer for every message in kafka.

This are my some detail:
---------------------java dependency----------

        storm-kafka-client    1.2.2
        storm-core    1.2.2
        kafka_2.10    0.10.2.0


--------------------component-----------------

kafka_2.12-2.0.0
apache-storm-1.2.2



------------------------------------- build KafkaTridentSpoutOpaque instance code ----------------------


protected static KafkaSpoutConfig<String, byte[]> newKafkaSpoutConfig(String bootstrapServers, String topic) {
        KafkaSpoutConfig.Builder<String, byte[]> builder = new KafkaSpoutConfig.Builder<>(bootstrapServers, topic);

        return builder.setProp(ConsumerConfig.GROUP_ID_CONFIG, "stormKafkaSpoutGroup")
                .setProp(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000")
                .setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
                .setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer")
                .setRecordTranslator(new JustValueFunc(), new Fields("str"))
                .setFirstPollOffsetStrategy(UNCOMMITTED_EARLIEST)
                .setProcessingGuarantee(AT_MOST_ONCE)
                .build();
    }
    
    private static KafkaTridentSpoutOpaque<String, byte[]> newKafkaTridentSpoutOpaque(KafkaSpoutConfig<String, byte[]> spoutConfig) {
        return new KafkaTridentSpoutOpaque<>(spoutConfig);
    }

    private static class JustValueFunc implements Func<ConsumerRecord<String, byte[]>, List<Object>>, Serializable {
        @Override
        public List<Object> apply(ConsumerRecord<String, byte[]> record) {
            Values res = null;
            try {
                res = new Values(PbMiddlewareTransfer.Record.parseFrom(record.value()));
            } catch (InvalidProtocolBufferException e) {
                e.printStackTrace();
            }
            return res;
        }
    }


-------------------------------there is my topology code ---------------------------------------
public static void main(String[] args) throws Exception {
        StormTopology topology = getTridentTopology();
        Config conf = new Config();
        conf.setNumWorkers(20);
        conf.setMaxSpoutPending(5000);
        StormSubmitter.submitTopology("storm-kafka-client-spout-test", conf, topology);
    }

    public static StormTopology getTridentTopology() {
        final TridentTopology tridentTopology = new TridentTopology();

        KafkaSpoutConfig<String, byte[]> spoutConfig = newKafkaSpoutConfig("192.168.0.202:9092", "test-2");
        ITridentDataSource spout = newKafkaTridentSpoutOpaque(spoutConfig);

        final Stream spoutStream = tridentTopology.newStream("spout", spout).parallelismHint(1);

        spoutStream.each(spoutStream.getOutputFields(), new Debug("##### fastest driver"));

        return tridentTopology.build();
    }




------------------------------log-------------------------
./6702/worker.log:2018-11-19 20:19:12.418 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:19:12 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:19:25.908 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:19:25 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:20:01.997 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:20:01 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:20:30.591 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:20:30 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:20:42.960 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:20:42 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:20:44.477 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:20:44 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:20:47.501 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:20:47 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:20:48.516 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:20:48 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:20:54.072 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:20:54 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:22:01.171 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:22:01 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:22:27.380 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:22:27 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:23:03.992 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:23:03 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:23:14.893 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:23:14 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:23:20.955 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:23:20 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:23:25.495 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:23:25 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:23:47.978 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:23:47 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:23:56.440 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:23:56 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:24:33.534 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:24:33 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:27:35.588 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:27:35 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:28:23.784 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:28:23 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:28:48.155 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:28:48 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:29:12.218 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:29:12 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:31:15.597 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:31:15 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:31:30.720 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:31:30 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:33:07.871 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:33:07 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:33:27.889 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:33:27 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:34:34.126 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:34:34 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:35:36.615 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:35:36 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:39:31.282 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:39:31 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:40:15.364 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:40:15 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:41:15.565 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:41:15 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:41:16.570 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:41:16 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:41:54.130 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:41:54 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:43:30.303 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:43:30 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:44:26.049 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:44:26 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:52:43.618 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:52:43 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:54:01.904 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:54:01 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:55:13.448 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:55:13 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:59:15.220 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:59:15 CST 2018> DEBUG(##### fastest driver): [metricID: 1


I just produce one message put in kafka, above message is my topology output message. it is except just one, but there are many. And it will repeted about half hour.


Any help is appreciated. Thanks, 
ChenBo

Re: storm-kafka-client Repeated consumption last message

Posted by Shaik Asifullah <s....@gmail.com>.
Hi,
Setting max spout pending value very high causes this. Try setting with low
value say 1. My topology works good with 10 as max spout pending. More info
here http://storm.apache.org/releases/1.1.2/FAQ.html

Thanks,
Shaik Asifullah



On Tue, 20 Nov 2018, 07:23 731635762 <mrchenbo@qq.com wrote:

> Hi All, I use storm+kafka+protobuf to build my stream process system.
>
> The problem is KafkaTridentSpoutOpaque Repeated consumption the last
> message. I really want  just once consumer for every message in kafka.
>
> This are my some detail:
> ---------------------java dependency----------
>         storm-kafka-client    1.2.2
>         storm-core    1.2.2
>         kafka_2.10    0.10.2.0
>
> --------------------component-----------------
> kafka_2.12-2.0.0
> apache-storm-1.2.2
>
> ------------------------------------- build KafkaTridentSpoutOpaque
> instance code ----------------------
>
> protected static KafkaSpoutConfig<String, byte[]>
> newKafkaSpoutConfig(String bootstrapServers, String topic) {
>         KafkaSpoutConfig.Builder<String, byte[]> builder = new
> KafkaSpoutConfig.Builder<>(bootstrapServers, topic);
>
>         return builder.setProp(ConsumerConfig.GROUP_ID_CONFIG,
> "stormKafkaSpoutGroup")
>                 .setProp(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000")
>                 .setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
> "org.apache.kafka.common.serialization.StringDeserializer")
>                 .setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
> "org.apache.kafka.common.serialization.ByteArrayDeserializer")
>                 .setRecordTranslator(new JustValueFunc(), new
> Fields("str"))
>                 .setFirstPollOffsetStrategy(UNCOMMITTED_EARLIEST)
>                 .setProcessingGuarantee(AT_MOST_ONCE)
>                 .build();
>     }
>
>     private static KafkaTridentSpoutOpaque<String, byte[]>
> newKafkaTridentSpoutOpaque(KafkaSpoutConfig<String, byte[]> spoutConfig) {
>         return new KafkaTridentSpoutOpaque<>(spoutConfig);
>     }
>
>     private static class JustValueFunc implements
> Func<ConsumerRecord<String, byte[]>, List<Object>>, Serializable {
>         @Override
>         public List<Object> apply(ConsumerRecord<String, byte[]> record) {
>             Values res = null;
>             try {
>                 res = new
> Values(PbMiddlewareTransfer.Record.parseFrom(record.value()));
>             } catch (InvalidProtocolBufferException e) {
>                 e.printStackTrace();
>             }
>             return res;
>         }
>     }
>
> -------------------------------there is my topology code
> ---------------------------------------
> public static void main(String[] args) throws Exception {
>         StormTopology topology = getTridentTopology();
>         Config conf = new Config();
>         conf.setNumWorkers(20);
>         conf.setMaxSpoutPending(5000);
>         StormSubmitter.submitTopology("storm-kafka-client-spout-test",
> conf, topology);
>     }
>
>     public static StormTopology getTridentTopology() {
>         final TridentTopology tridentTopology = new TridentTopology();
>
>         KafkaSpoutConfig<String, byte[]> spoutConfig =
> newKafkaSpoutConfig("192.168.0.202:9092", "test-2");
>         ITridentDataSource spout = newKafkaTridentSpoutOpaque(spoutConfig);
>
>         final Stream spoutStream = tridentTopology.newStream("spout",
> spout).parallelismHint(1);
>
>         spoutStream.each(spoutStream.getOutputFields(), new Debug("#####
> fastest driver"));
>
>         return tridentTopology.build();
>     }
>
>
> ------------------------------log-------------------------
> ./6702/worker.log:2018-11-19 20:19:12.418 STDIO Thread-9-b-0-executor[23
> 23] [INFO] <Mon Nov 19 20:19:12 CST 2018> DEBUG(##### fastest driver):
> [metricID: 1
> ./6702/worker.log:2018-11-19 20:19:25.908 STDIO Thread-9-b-0-executor[23
> 23] [INFO] <Mon Nov 19 20:19:25 CST 2018> DEBUG(##### fastest driver):
> [metricID: 1
> ./6702/worker.log:2018-11-19 20:20:01.997 STDIO Thread-9-b-0-executor[23
> 23] [INFO] <Mon Nov 19 20:20:01 CST 2018> DEBUG(##### fastest driver):
> [metricID: 1
> ./6702/worker.log:2018-11-19 20:20:30.591 STDIO Thread-9-b-0-executor[23
> 23] [INFO] <Mon Nov 19 20:20:30 CST 2018> DEBUG(##### fastest driver):
> [metricID: 1
> ./6702/worker.log:2018-11-19 20:20:42.960 STDIO Thread-9-b-0-executor[23
> 23] [INFO] <Mon Nov 19 20:20:42 CST 2018> DEBUG(##### fastest driver):
> [metricID: 1
> ./6702/worker.log:2018-11-19 20:20:44.477 STDIO Thread-9-b-0-executor[23
> 23] [INFO] <Mon Nov 19 20:20:44 CST 2018> DEBUG(##### fastest driver):
> [metricID: 1
> ./6702/worker.log:2018-11-19 20:20:47.501 STDIO Thread-9-b-0-executor[23
> 23] [INFO] <Mon Nov 19 20:20:47 CST 2018> DEBUG(##### fastest driver):
> [metricID: 1
> ./6702/worker.log:2018-11-19 20:20:48.516 STDIO Thread-9-b-0-executor[23
> 23] [INFO] <Mon Nov 19 20:20:48 CST 2018> DEBUG(##### fastest driver):
> [metricID: 1
> ./6702/worker.log:2018-11-19 20:20:54.072 STDIO Thread-9-b-0-executor[23
> 23] [INFO] <Mon Nov 19 20:20:54 CST 2018> DEBUG(##### fastest driver):
> [metricID: 1
> ./6702/worker.log:2018-11-19 20:22:01.171 STDIO Thread-9-b-0-executor[23
> 23] [INFO] <Mon Nov 19 20:22:01 CST 2018> DEBUG(##### fastest driver):
> [metricID: 1
> ./6702/worker.log:2018-11-19 20:22:27.380 STDIO Thread-9-b-0-executor[23
> 23] [INFO] <Mon Nov 19 20:22:27 CST 2018> DEBUG(##### fastest driver):
> [metricID: 1
> ./6702/worker.log:2018-11-19 20:23:03.992 STDIO Thread-9-b-0-executor[23
> 23] [INFO] <Mon Nov 19 20:23:03 CST 2018> DEBUG(##### fastest driver):
> [metricID: 1
> ./6702/worker.log:2018-11-19 20:23:14.893 STDIO Thread-9-b-0-executor[23
> 23] [INFO] <Mon Nov 19 20:23:14 CST 2018> DEBUG(##### fastest driver):
> [metricID: 1
> ./6702/worker.log:2018-11-19 20:23:20.955 STDIO Thread-9-b-0-executor[23
> 23] [INFO] <Mon Nov 19 20:23:20 CST 2018> DEBUG(##### fastest driver):
> [metricID: 1
> ./6702/worker.log:2018-11-19 20:23:25.495 STDIO Thread-9-b-0-executor[23
> 23] [INFO] <Mon Nov 19 20:23:25 CST 2018> DEBUG(##### fastest driver):
> [metricID: 1
> ./6702/worker.log:2018-11-19 20:23:47.978 STDIO Thread-9-b-0-executor[23
> 23] [INFO] <Mon Nov 19 20:23:47 CST 2018> DEBUG(##### fastest driver):
> [metricID: 1
> ./6702/worker.log:2018-11-19 20:23:56.440 STDIO Thread-9-b-0-executor[23
> 23] [INFO] <Mon Nov 19 20:23:56 CST 2018> DEBUG(##### fastest driver):
> [metricID: 1
> ./6702/worker.log:2018-11-19 20:24:33.534 STDIO Thread-9-b-0-executor[23
> 23] [INFO] <Mon Nov 19 20:24:33 CST 2018> DEBUG(##### fastest driver):
> [metricID: 1
> ./6702/worker.log:2018-11-19 20:27:35.588 STDIO Thread-9-b-0-executor[23
> 23] [INFO] <Mon Nov 19 20:27:35 CST 2018> DEBUG(##### fastest driver):
> [metricID: 1
> ./6702/worker.log:2018-11-19 20:28:23.784 STDIO Thread-9-b-0-executor[23
> 23] [INFO] <Mon Nov 19 20:28:23 CST 2018> DEBUG(##### fastest driver):
> [metricID: 1
> ./6702/worker.log:2018-11-19 20:28:48.155 STDIO Thread-9-b-0-executor[23
> 23] [INFO] <Mon Nov 19 20:28:48 CST 2018> DEBUG(##### fastest driver):
> [metricID: 1
> ./6702/worker.log:2018-11-19 20:29:12.218 STDIO Thread-9-b-0-executor[23
> 23] [INFO] <Mon Nov 19 20:29:12 CST 2018> DEBUG(##### fastest driver):
> [metricID: 1
> ./6702/worker.log:2018-11-19 20:31:15.597 STDIO Thread-9-b-0-executor[23
> 23] [INFO] <Mon Nov 19 20:31:15 CST 2018> DEBUG(##### fastest driver):
> [metricID: 1
> ./6702/worker.log:2018-11-19 20:31:30.720 STDIO Thread-9-b-0-executor[23
> 23] [INFO] <Mon Nov 19 20:31:30 CST 2018> DEBUG(##### fastest driver):
> [metricID: 1
> ./6702/worker.log:2018-11-19 20:33:07.871 STDIO Thread-9-b-0-executor[23
> 23] [INFO] <Mon Nov 19 20:33:07 CST 2018> DEBUG(##### fastest driver):
> [metricID: 1
> ./6702/worker.log:2018-11-19 20:33:27.889 STDIO Thread-9-b-0-executor[23
> 23] [INFO] <Mon Nov 19 20:33:27 CST 2018> DEBUG(##### fastest driver):
> [metricID: 1
> ./6702/worker.log:2018-11-19 20:34:34.126 STDIO Thread-9-b-0-executor[23
> 23] [INFO] <Mon Nov 19 20:34:34 CST 2018> DEBUG(##### fastest driver):
> [metricID: 1
> ./6702/worker.log:2018-11-19 20:35:36.615 STDIO Thread-9-b-0-executor[23
> 23] [INFO] <Mon Nov 19 20:35:36 CST 2018> DEBUG(##### fastest driver):
> [metricID: 1
> ./6702/worker.log:2018-11-19 20:39:31.282 STDIO Thread-9-b-0-executor[23
> 23] [INFO] <Mon Nov 19 20:39:31 CST 2018> DEBUG(##### fastest driver):
> [metricID: 1
> ./6702/worker.log:2018-11-19 20:40:15.364 STDIO Thread-9-b-0-executor[23
> 23] [INFO] <Mon Nov 19 20:40:15 CST 2018> DEBUG(##### fastest driver):
> [metricID: 1
> ./6702/worker.log:2018-11-19 20:41:15.565 STDIO Thread-9-b-0-executor[23
> 23] [INFO] <Mon Nov 19 20:41:15 CST 2018> DEBUG(##### fastest driver):
> [metricID: 1
> ./6702/worker.log:2018-11-19 20:41:16.570 STDIO Thread-9-b-0-executor[23
> 23] [INFO] <Mon Nov 19 20:41:16 CST 2018> DEBUG(##### fastest driver):
> [metricID: 1
> ./6702/worker.log:2018-11-19 20:41:54.130 STDIO Thread-9-b-0-executor[23
> 23] [INFO] <Mon Nov 19 20:41:54 CST 2018> DEBUG(##### fastest driver):
> [metricID: 1
> ./6702/worker.log:2018-11-19 20:43:30.303 STDIO Thread-9-b-0-executor[23
> 23] [INFO] <Mon Nov 19 20:43:30 CST 2018> DEBUG(##### fastest driver):
> [metricID: 1
> ./6702/worker.log:2018-11-19 20:44:26.049 STDIO Thread-9-b-0-executor[23
> 23] [INFO] <Mon Nov 19 20:44:26 CST 2018> DEBUG(##### fastest driver):
> [metricID: 1
> ./6702/worker.log:2018-11-19 20:52:43.618 STDIO Thread-9-b-0-executor[23
> 23] [INFO] <Mon Nov 19 20:52:43 CST 2018> DEBUG(##### fastest driver):
> [metricID: 1
> ./6702/worker.log:2018-11-19 20:54:01.904 STDIO Thread-9-b-0-executor[23
> 23] [INFO] <Mon Nov 19 20:54:01 CST 2018> DEBUG(##### fastest driver):
> [metricID: 1
> ./6702/worker.log:2018-11-19 20:55:13.448 STDIO Thread-9-b-0-executor[23
> 23] [INFO] <Mon Nov 19 20:55:13 CST 2018> DEBUG(##### fastest driver):
> [metricID: 1
> ./6702/worker.log:2018-11-19 20:59:15.220 STDIO Thread-9-b-0-executor[23
> 23] [INFO] <Mon Nov 19 20:59:15 CST 2018> DEBUG(##### fastest driver):
> [metricID: 1
>
> I just produce one message put in kafka, above message is my topology
> output message. it is except just one, but there are many. And it will
> repeted about half hour.
>
> Any help is appreciated.
>
> Thanks,
> ChenBo
>
>