You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by srdo <gi...@git.apache.org> on 2018/01/27 19:21:20 UTC

[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

GitHub user srdo opened a pull request:

    https://github.com/apache/storm/pull/2538

     STORM-2913: Add metadata to at-most-once and at-least-once commits 

    https://issues.apache.org/jira/browse/STORM-2913
    
    This builds on STORM-2914.
    
    I believe we can resolve STORM-2913 by committing metadata in all processing guarantee modes, rather than just AT_LEAST_ONCE. This change simply adds metadata to the AT_MOST_ONCE and NONE commit statements.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/srdo/storm STORM-2913

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/storm/pull/2538.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2538
    
----
commit 3e09a1d718f8d40424e7ac5d574efe7a74706cf8
Author: Stig Rohde Døssing <sr...@...>
Date:   2018-01-27T18:22:07Z

    STORM-2914: Implement ProcessingGuarantee.NONE in the spout instead of using enable.auto.commit

commit 99c7a30bf38c523a1f97411e7a42dbb44017f9f0
Author: Stig Rohde Døssing <sr...@...>
Date:   2018-01-27T14:15:45Z

    STORM-2913: Add metadata to at-most-once and at-least-once commits

----


---

[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2538#discussion_r165854399
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadataManager.java ---
    @@ -0,0 +1,90 @@
    +/*
    + * Copyright 2018 The Apache Software Foundation.
    + *
    + * Licensed under the Apache License, Version 2.0 (the "License");
    + * you may not use this file except in compliance with the License.
    + * You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.kafka.spout.internal;
    +
    +import com.fasterxml.jackson.core.JsonProcessingException;
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import java.io.IOException;
    +import java.util.Map;
    +import org.apache.kafka.clients.consumer.OffsetAndMetadata;
    +import org.apache.kafka.common.TopicPartition;
    +import org.apache.storm.kafka.spout.KafkaSpout;
    +import org.apache.storm.kafka.spout.KafkaSpoutConfig.ProcessingGuarantee;
    +import org.apache.storm.task.TopologyContext;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Generates and reads commit metadata.
    + */
    +public class CommitMetadataManager {
    +
    +    private static final ObjectMapper JSON_MAPPER = new ObjectMapper();
    +    private static final Logger LOG = LoggerFactory.getLogger(CommitMetadata.class);
    +    // Metadata information to commit to Kafka. It is unique per spout instance.
    +    private final String commitMetadata;
    +    private final ProcessingGuarantee processingGuarantee;
    +    private final TopologyContext context;
    +
    +    /**
    +     * Create a manager with the given context.
    +     */
    +    public CommitMetadataManager(TopologyContext context, ProcessingGuarantee processingGuarantee) {
    +        this.context = context;
    +        try {
    +            commitMetadata = JSON_MAPPER.writeValueAsString(new CommitMetadata(
    +                context.getStormId(), context.getThisTaskId(), Thread.currentThread().getName()));
    +            this.processingGuarantee = processingGuarantee;
    +        } catch (JsonProcessingException e) {
    +            LOG.error("Failed to create Kafka commit metadata due to JSON serialization error", e);
    +            throw new RuntimeException(e);
    +        }
    +    }
    +
    +    /**
    +     * Checks if {@link OffsetAndMetadata} was committed by a {@link KafkaSpout} instance in this topology.
    +     *
    +     * @param tp The topic partition the commit metadata belongs to.
    +     * @param committedOffset {@link OffsetAndMetadata} info committed to Kafka
    +     * @param offsetManagers The offset managers.
    +     * @return true if this topology committed this {@link OffsetAndMetadata}, false otherwise
    +     */
    +    public boolean isOffsetCommittedByThisTopology(TopicPartition tp, OffsetAndMetadata committedOffset,
    +        Map<TopicPartition, OffsetManager> offsetManagers) {
    +        try {
    +            if (processingGuarantee == ProcessingGuarantee.AT_LEAST_ONCE
    +                && offsetManagers.containsKey(tp)
    +                && offsetManagers.get(tp).hasCommitted()) {
    +                return true;
    +            }
    +
    +            final CommitMetadata committedMetadata = JSON_MAPPER.readValue(committedOffset.metadata(), CommitMetadata.class);
    +            return committedMetadata.getTopologyId().equals(context.getStormId());
    +        } catch (IOException e) {
    +            LOG.warn("Failed to deserialize [{}]. Error likely occurred because the last commit "
    --- End diff --
    
    Yes, I'll update this message


---

[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2538#discussion_r165853833
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadataManager.java ---
    @@ -0,0 +1,90 @@
    +/*
    + * Copyright 2018 The Apache Software Foundation.
    + *
    + * Licensed under the Apache License, Version 2.0 (the "License");
    + * you may not use this file except in compliance with the License.
    + * You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.kafka.spout.internal;
    +
    +import com.fasterxml.jackson.core.JsonProcessingException;
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import java.io.IOException;
    +import java.util.Map;
    +import org.apache.kafka.clients.consumer.OffsetAndMetadata;
    +import org.apache.kafka.common.TopicPartition;
    +import org.apache.storm.kafka.spout.KafkaSpout;
    +import org.apache.storm.kafka.spout.KafkaSpoutConfig.ProcessingGuarantee;
    +import org.apache.storm.task.TopologyContext;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Generates and reads commit metadata.
    + */
    +public class CommitMetadataManager {
    --- End diff --
    
    It's in the internal package, so if we make it package private the spout can't see it. I think it's up to users to figure out that if the class is in a package called "internal", they probably shouldn't use it. Once we start looking at Java 9 modularization we can make sure not to export this.


---

[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2538#discussion_r165857620
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadataManager.java ---
    @@ -0,0 +1,90 @@
    +/*
    + * Copyright 2018 The Apache Software Foundation.
    + *
    + * Licensed under the Apache License, Version 2.0 (the "License");
    + * you may not use this file except in compliance with the License.
    + * You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.kafka.spout.internal;
    +
    +import com.fasterxml.jackson.core.JsonProcessingException;
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import java.io.IOException;
    +import java.util.Map;
    +import org.apache.kafka.clients.consumer.OffsetAndMetadata;
    +import org.apache.kafka.common.TopicPartition;
    +import org.apache.storm.kafka.spout.KafkaSpout;
    +import org.apache.storm.kafka.spout.KafkaSpoutConfig.ProcessingGuarantee;
    +import org.apache.storm.task.TopologyContext;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Generates and reads commit metadata.
    + */
    +public class CommitMetadataManager {
    +
    +    private static final ObjectMapper JSON_MAPPER = new ObjectMapper();
    +    private static final Logger LOG = LoggerFactory.getLogger(CommitMetadata.class);
    +    // Metadata information to commit to Kafka. It is unique per spout instance.
    +    private final String commitMetadata;
    +    private final ProcessingGuarantee processingGuarantee;
    +    private final TopologyContext context;
    +
    +    /**
    +     * Create a manager with the given context.
    +     */
    +    public CommitMetadataManager(TopologyContext context, ProcessingGuarantee processingGuarantee) {
    +        this.context = context;
    +        try {
    +            commitMetadata = JSON_MAPPER.writeValueAsString(new CommitMetadata(
    --- End diff --
    
    It's bad practice to create objects in the constructor. I simply don't do it. However, it's done in a lot of places in Storm already, so I am OK if you want to leave it as is. 
    
    Another example why this could potentially be bad is if someone wants do subclass this class. If we leave it like this, perhaps the class should be final then.
    
    These are just some suggestions. You can  leave it as is or go with either of of the suggestions.


---

[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2538#discussion_r165857479
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -142,7 +139,7 @@ public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputC
             offsetManagers = new HashMap<>();
             emitted = new HashSet<>();
             waitingToEmit = new HashMap<>();
    -        setCommitMetadata(context);
    +        commitMetadataManager = new CommitMetadataManager(context, kafkaSpoutConfig.getProcessingGuarantee());
    --- End diff --
    
    ok


---

[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2538#discussion_r165853160
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadataManager.java ---
    @@ -0,0 +1,90 @@
    +/*
    + * Copyright 2018 The Apache Software Foundation.
    + *
    + * Licensed under the Apache License, Version 2.0 (the "License");
    + * you may not use this file except in compliance with the License.
    + * You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.kafka.spout.internal;
    +
    +import com.fasterxml.jackson.core.JsonProcessingException;
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import java.io.IOException;
    +import java.util.Map;
    +import org.apache.kafka.clients.consumer.OffsetAndMetadata;
    +import org.apache.kafka.common.TopicPartition;
    +import org.apache.storm.kafka.spout.KafkaSpout;
    +import org.apache.storm.kafka.spout.KafkaSpoutConfig.ProcessingGuarantee;
    +import org.apache.storm.task.TopologyContext;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Generates and reads commit metadata.
    + */
    +public class CommitMetadataManager {
    +
    +    private static final ObjectMapper JSON_MAPPER = new ObjectMapper();
    +    private static final Logger LOG = LoggerFactory.getLogger(CommitMetadata.class);
    +    // Metadata information to commit to Kafka. It is unique per spout instance.
    +    private final String commitMetadata;
    +    private final ProcessingGuarantee processingGuarantee;
    +    private final TopologyContext context;
    +
    +    /**
    +     * Create a manager with the given context.
    +     */
    +    public CommitMetadataManager(TopologyContext context, ProcessingGuarantee processingGuarantee) {
    +        this.context = context;
    +        try {
    +            commitMetadata = JSON_MAPPER.writeValueAsString(new CommitMetadata(
    +                context.getStormId(), context.getThisTaskId(), Thread.currentThread().getName()));
    +            this.processingGuarantee = processingGuarantee;
    +        } catch (JsonProcessingException e) {
    +            LOG.error("Failed to create Kafka commit metadata due to JSON serialization error", e);
    +            throw new RuntimeException(e);
    +        }
    +    }
    +
    +    /**
    +     * Checks if {@link OffsetAndMetadata} was committed by a {@link KafkaSpout} instance in this topology.
    +     *
    +     * @param tp The topic partition the commit metadata belongs to.
    +     * @param committedOffset {@link OffsetAndMetadata} info committed to Kafka
    +     * @param offsetManagers The offset managers.
    +     * @return true if this topology committed this {@link OffsetAndMetadata}, false otherwise
    +     */
    +    public boolean isOffsetCommittedByThisTopology(TopicPartition tp, OffsetAndMetadata committedOffset,
    +        Map<TopicPartition, OffsetManager> offsetManagers) {
    +        try {
    +            if (processingGuarantee == ProcessingGuarantee.AT_LEAST_ONCE
    +                && offsetManagers.containsKey(tp)
    +                && offsetManagers.get(tp).hasCommitted()) {
    +                return true;
    +            }
    +
    +            final CommitMetadata committedMetadata = JSON_MAPPER.readValue(committedOffset.metadata(), CommitMetadata.class);
    +            return committedMetadata.getTopologyId().equals(context.getStormId());
    +        } catch (IOException e) {
    +            LOG.warn("Failed to deserialize [{}]. Error likely occurred because the last commit "
    --- End diff --
    
    We should either write in the README or as part of this message that this WARN is expected the first time a user starts this or an earlier version of the spout with commits to Kafka done by an older version of the spout.


---

[GitHub] storm issue #2538: STORM-2913: Add metadata to at-most-once and at-least-onc...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on the issue:

    https://github.com/apache/storm/pull/2538
  
    @srdo there is a discrepancy between the title of this pull request and the title of the associated JIRA. What problem are you trying to solve in this patch? Add meatada, or remove warnings?
    
    If the goal is to solve the recurring WARN messages that get printed, in my opinion the obvious thing to do is to simply not log the message unless the processing guarantee is AT_LEAST_ONCE. This would be a one line change
    
    Can you please also clarify what is the need to add metadata when running the spout in AT-MOST-ONCE mode ? Also, I think it is quite dangerous to try to mimic the behavior Kafka does by simply calling commitAsync(...) in auto.commit mode. Besides the WARN messages, is there any other problem associated with letting Kafka handle everything in auto.commit mode?
    
    
    



---

[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2538#discussion_r165853784
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -142,7 +139,7 @@ public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputC
             offsetManagers = new HashMap<>();
             emitted = new HashSet<>();
             waitingToEmit = new HashMap<>();
    -        setCommitMetadata(context);
    +        commitMetadataManager = new CommitMetadataManager(context, kafkaSpoutConfig.getProcessingGuarantee());
    --- End diff --
    
    I think we should wait. Let's do it if someone needs it, but introducing more extension points than we need is likely to cause us more headaches down the road, because once the extension point is public it's harder for us to change if we need to because we have to consider that other people may be implementing the interface.


---

[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2538#discussion_r165857469
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadataManager.java ---
    @@ -75,8 +75,9 @@ public boolean isOffsetCommittedByThisTopology(TopicPartition tp, OffsetAndMetad
                 final CommitMetadata committedMetadata = JSON_MAPPER.readValue(committedOffset.metadata(), CommitMetadata.class);
                 return committedMetadata.getTopologyId().equals(context.getStormId());
             } catch (IOException e) {
    -            LOG.warn("Failed to deserialize [{}]. Error likely occurred because the last commit "
    -                + "for this topic-partition was done using an earlier version of Storm. "
    +            LOG.warn("Failed to deserialize expected commit metadata [{}]."
    +                + " This error is expected to occur once per partition, if the last commit to each partition"
    +                + " was by an earlier version of the KafkaSpout, or by something other than the KafkaSpout. "
    --- End diff --
    
    was done by an earlier version ... or by a process other than the KafkaSpout
    



---

[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2538#discussion_r165854506
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -396,7 +361,10 @@ private void setWaitingToEmit(ConsumerRecords<K, V> consumerRecords) {
                     numPolledRecords);
                 if (kafkaSpoutConfig.getProcessingGuarantee() == KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) {
                     //Commit polled records immediately to ensure delivery is at-most-once.
    -                kafkaConsumer.commitSync();
    +                Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = 
    +                    createOffsetsToCommitForConsumedOffsets(kafkaConsumer.assignment());
    +                kafkaConsumer.commitSync(offsetsToCommit);
    +                LOG.debug("Committed offsets {} to Kafka", offsetsToCommit);
    --- End diff --
    
    I don't mind adding it, but is this information useful to the user?


---

[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2538#discussion_r165857745
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadataManager.java ---
    @@ -0,0 +1,90 @@
    +/*
    + * Copyright 2018 The Apache Software Foundation.
    + *
    + * Licensed under the Apache License, Version 2.0 (the "License");
    + * you may not use this file except in compliance with the License.
    + * You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.kafka.spout.internal;
    +
    +import com.fasterxml.jackson.core.JsonProcessingException;
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import java.io.IOException;
    +import java.util.Map;
    +import org.apache.kafka.clients.consumer.OffsetAndMetadata;
    +import org.apache.kafka.common.TopicPartition;
    +import org.apache.storm.kafka.spout.KafkaSpout;
    +import org.apache.storm.kafka.spout.KafkaSpoutConfig.ProcessingGuarantee;
    +import org.apache.storm.task.TopologyContext;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Generates and reads commit metadata.
    + */
    +public class CommitMetadataManager {
    --- End diff --
    
    ok


---

[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2538#discussion_r165859312
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadataManager.java ---
    @@ -0,0 +1,90 @@
    +/*
    + * Copyright 2018 The Apache Software Foundation.
    + *
    + * Licensed under the Apache License, Version 2.0 (the "License");
    + * you may not use this file except in compliance with the License.
    + * You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.kafka.spout.internal;
    +
    +import com.fasterxml.jackson.core.JsonProcessingException;
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import java.io.IOException;
    +import java.util.Map;
    +import org.apache.kafka.clients.consumer.OffsetAndMetadata;
    +import org.apache.kafka.common.TopicPartition;
    +import org.apache.storm.kafka.spout.KafkaSpout;
    +import org.apache.storm.kafka.spout.KafkaSpoutConfig.ProcessingGuarantee;
    +import org.apache.storm.task.TopologyContext;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Generates and reads commit metadata.
    + */
    +public class CommitMetadataManager {
    +
    +    private static final ObjectMapper JSON_MAPPER = new ObjectMapper();
    +    private static final Logger LOG = LoggerFactory.getLogger(CommitMetadata.class);
    +    // Metadata information to commit to Kafka. It is unique per spout instance.
    +    private final String commitMetadata;
    +    private final ProcessingGuarantee processingGuarantee;
    +    private final TopologyContext context;
    +
    +    /**
    +     * Create a manager with the given context.
    +     */
    +    public CommitMetadataManager(TopologyContext context, ProcessingGuarantee processingGuarantee) {
    +        this.context = context;
    +        try {
    +            commitMetadata = JSON_MAPPER.writeValueAsString(new CommitMetadata(
    --- End diff --
    
    I'm happy to declare the class final, hopefully that'll discourage people from using the class directly if the internal package hint doesn't do the trick.
    
    Regarding creating objects in the constructor, I agree with you in the general case (DI is great), but for pure data objects like CommitMetadata I'm not sure I understand the harm? 


---

[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2538#discussion_r165854507
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -469,11 +437,14 @@ private boolean emitOrRetryTuple(ConsumerRecord<K, V> record) {
                 LOG.trace("Tuple for record [{}] has already been emitted. Skipping", record);
             } else {
                 final OffsetAndMetadata committedOffset = kafkaConsumer.committed(tp);
    -            if (committedOffset != null && isOffsetCommittedByThisTopology(tp, committedOffset)
    +            if (isAtLeastOnceProcessing()
    +                && committedOffset != null 
    +                && commitMetadataManager.isOffsetCommittedByThisTopology(tp, committedOffset, offsetManagers)
    --- End diff --
    
    Good point


---

[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2538#discussion_r165852989
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadataManager.java ---
    @@ -0,0 +1,90 @@
    +/*
    + * Copyright 2018 The Apache Software Foundation.
    + *
    + * Licensed under the Apache License, Version 2.0 (the "License");
    + * you may not use this file except in compliance with the License.
    + * You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.kafka.spout.internal;
    +
    +import com.fasterxml.jackson.core.JsonProcessingException;
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import java.io.IOException;
    +import java.util.Map;
    +import org.apache.kafka.clients.consumer.OffsetAndMetadata;
    +import org.apache.kafka.common.TopicPartition;
    +import org.apache.storm.kafka.spout.KafkaSpout;
    +import org.apache.storm.kafka.spout.KafkaSpoutConfig.ProcessingGuarantee;
    +import org.apache.storm.task.TopologyContext;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Generates and reads commit metadata.
    + */
    +public class CommitMetadataManager {
    +
    +    private static final ObjectMapper JSON_MAPPER = new ObjectMapper();
    +    private static final Logger LOG = LoggerFactory.getLogger(CommitMetadata.class);
    +    // Metadata information to commit to Kafka. It is unique per spout instance.
    +    private final String commitMetadata;
    +    private final ProcessingGuarantee processingGuarantee;
    +    private final TopologyContext context;
    +
    +    /**
    +     * Create a manager with the given context.
    +     */
    +    public CommitMetadataManager(TopologyContext context, ProcessingGuarantee processingGuarantee) {
    +        this.context = context;
    +        try {
    +            commitMetadata = JSON_MAPPER.writeValueAsString(new CommitMetadata(
    --- End diff --
    
    Ideally commitMetadata would be passed in the constructor to facilitate unit testing. We could have a factory method in this class itself with this code
    
    ```java
    public CommitMetadataManager(TopologyContext context, ProcessingGuarantee processingGuarantee, String commitMetadata)
    ```
    
    ```java
    public static CommitMetadataManager newInstance(TopologyContext context, ProcessingGuarantee processingGuarantee) {
        return new CommitMetadataManager(context, processingGuarantee, JSON_MAPPER.writeValueAsString(new CommitMetadata(context.getStormId(), context.getThisTaskId(), Thread.currentThread().getName())));
    }
    ```
    
    handling the JsonProcessingException in the factory method


---

[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/storm/pull/2538


---

[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2538#discussion_r165852670
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -396,7 +361,10 @@ private void setWaitingToEmit(ConsumerRecords<K, V> consumerRecords) {
                     numPolledRecords);
                 if (kafkaSpoutConfig.getProcessingGuarantee() == KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) {
                     //Commit polled records immediately to ensure delivery is at-most-once.
    -                kafkaConsumer.commitSync();
    +                Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = 
    +                    createOffsetsToCommitForConsumedOffsets(kafkaConsumer.assignment());
    +                kafkaConsumer.commitSync(offsetsToCommit);
    --- End diff --
    
    createFetchedOffsetsMetadata


---

[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2538#discussion_r165852864
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadataManager.java ---
    @@ -0,0 +1,90 @@
    +/*
    + * Copyright 2018 The Apache Software Foundation.
    + *
    + * Licensed under the Apache License, Version 2.0 (the "License");
    + * you may not use this file except in compliance with the License.
    + * You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.kafka.spout.internal;
    +
    +import com.fasterxml.jackson.core.JsonProcessingException;
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import java.io.IOException;
    +import java.util.Map;
    +import org.apache.kafka.clients.consumer.OffsetAndMetadata;
    +import org.apache.kafka.common.TopicPartition;
    +import org.apache.storm.kafka.spout.KafkaSpout;
    +import org.apache.storm.kafka.spout.KafkaSpoutConfig.ProcessingGuarantee;
    +import org.apache.storm.task.TopologyContext;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Generates and reads commit metadata.
    + */
    +public class CommitMetadataManager {
    +
    +    private static final ObjectMapper JSON_MAPPER = new ObjectMapper();
    +    private static final Logger LOG = LoggerFactory.getLogger(CommitMetadata.class);
    +    // Metadata information to commit to Kafka. It is unique per spout instance.
    --- End diff --
    
    Do you mean CommitMetadataManager.class ?


---

[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2538#discussion_r165859143
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadataManager.java ---
    @@ -0,0 +1,90 @@
    +/*
    + * Copyright 2018 The Apache Software Foundation.
    + *
    + * Licensed under the Apache License, Version 2.0 (the "License");
    + * you may not use this file except in compliance with the License.
    + * You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.kafka.spout.internal;
    +
    +import com.fasterxml.jackson.core.JsonProcessingException;
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import java.io.IOException;
    +import java.util.Map;
    +import org.apache.kafka.clients.consumer.OffsetAndMetadata;
    +import org.apache.kafka.common.TopicPartition;
    +import org.apache.storm.kafka.spout.KafkaSpout;
    +import org.apache.storm.kafka.spout.KafkaSpoutConfig.ProcessingGuarantee;
    +import org.apache.storm.task.TopologyContext;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Generates and reads commit metadata.
    + */
    +public class CommitMetadataManager {
    +
    +    private static final ObjectMapper JSON_MAPPER = new ObjectMapper();
    --- End diff --
    
    It's my impression that doing something like that (declaring one global ObjectMapper) would be fine, except if we needed different configurations from the ObjectMappers. https://stackoverflow.com/a/3909846/8845188 (The responder is one of the Jackson devs). 


---

[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2538#discussion_r165854760
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadataManager.java ---
    @@ -0,0 +1,90 @@
    +/*
    + * Copyright 2018 The Apache Software Foundation.
    + *
    + * Licensed under the Apache License, Version 2.0 (the "License");
    + * you may not use this file except in compliance with the License.
    + * You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.kafka.spout.internal;
    +
    +import com.fasterxml.jackson.core.JsonProcessingException;
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import java.io.IOException;
    +import java.util.Map;
    +import org.apache.kafka.clients.consumer.OffsetAndMetadata;
    +import org.apache.kafka.common.TopicPartition;
    +import org.apache.storm.kafka.spout.KafkaSpout;
    +import org.apache.storm.kafka.spout.KafkaSpoutConfig.ProcessingGuarantee;
    +import org.apache.storm.task.TopologyContext;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Generates and reads commit metadata.
    + */
    +public class CommitMetadataManager {
    +
    +    private static final ObjectMapper JSON_MAPPER = new ObjectMapper();
    +    private static final Logger LOG = LoggerFactory.getLogger(CommitMetadata.class);
    +    // Metadata information to commit to Kafka. It is unique per spout instance.
    +    private final String commitMetadata;
    +    private final ProcessingGuarantee processingGuarantee;
    +    private final TopologyContext context;
    +
    +    /**
    +     * Create a manager with the given context.
    +     */
    +    public CommitMetadataManager(TopologyContext context, ProcessingGuarantee processingGuarantee) {
    +        this.context = context;
    +        try {
    +            commitMetadata = JSON_MAPPER.writeValueAsString(new CommitMetadata(
    +                context.getStormId(), context.getThisTaskId(), Thread.currentThread().getName()));
    +            this.processingGuarantee = processingGuarantee;
    +        } catch (JsonProcessingException e) {
    +            LOG.error("Failed to create Kafka commit metadata due to JSON serialization error", e);
    +            throw new RuntimeException(e);
    +        }
    +    }
    +
    +    /**
    +     * Checks if {@link OffsetAndMetadata} was committed by a {@link KafkaSpout} instance in this topology.
    +     *
    +     * @param tp The topic partition the commit metadata belongs to.
    +     * @param committedOffset {@link OffsetAndMetadata} info committed to Kafka
    +     * @param offsetManagers The offset managers.
    +     * @return true if this topology committed this {@link OffsetAndMetadata}, false otherwise
    +     */
    +    public boolean isOffsetCommittedByThisTopology(TopicPartition tp, OffsetAndMetadata committedOffset,
    +        Map<TopicPartition, OffsetManager> offsetManagers) {
    +        try {
    +            if (processingGuarantee == ProcessingGuarantee.AT_LEAST_ONCE
    +                && offsetManagers.containsKey(tp)
    +                && offsetManagers.get(tp).hasCommitted()) {
    +                return true;
    +            }
    +
    +            final CommitMetadata committedMetadata = JSON_MAPPER.readValue(committedOffset.metadata(), CommitMetadata.class);
    +            return committedMetadata.getTopologyId().equals(context.getStormId());
    +        } catch (IOException e) {
    +            LOG.warn("Failed to deserialize [{}]. Error likely occurred because the last commit "
    --- End diff --
    
    Tried to update this so it's a little more clear about when it will be printed


---

[GitHub] storm issue #2538: STORM-2913: Add metadata to at-most-once and at-least-onc...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on the issue:

    https://github.com/apache/storm/pull/2538
  
    +1. 
    
    Once squashed is good to merge as far as I am concerned. Thanks a lot @srdo.


---

[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2538#discussion_r165857816
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java ---
    @@ -453,37 +460,37 @@ public Builder(String bootstrapServers, Subscription subscription) {
             return builder;
         }
     
    -    private static void setAutoCommitMode(Builder<?, ?> builder) {
    +    private static void setKafkaPropsForProcessingGuarantee(Builder<?, ?> builder) {
             if (builder.kafkaProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) {
    -            throw new IllegalArgumentException("Do not set " + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG + " manually."
    -                + " Instead use KafkaSpoutConfig.Builder.setProcessingGuarantee");
    +            throw new IllegalStateException("The KafkaConsumer " + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
    +                + " setting is not supported. You can configure similar behavior through KafkaSpoutConfig.Builder.setProcessingGuarantee");
             }
    -        if (builder.processingGuarantee == ProcessingGuarantee.NONE) {
    -            builder.kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
    -        } else {
    -            String autoOffsetResetPolicy = (String)builder.kafkaProps.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
    -            if (builder.processingGuarantee == ProcessingGuarantee.AT_LEAST_ONCE) {
    -                if (autoOffsetResetPolicy == null) {
    -                    /*
    -                    If the user wants to explicitly set an auto offset reset policy, we should respect it, but when the spout is configured
    -                    for at-least-once processing we should default to seeking to the earliest offset in case there's an offset out of range
    -                    error, rather than seeking to the latest (Kafka's default). This type of error will typically happen when the consumer 
    -                    requests an offset that was deleted.
    -                     */
    -                    builder.kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    -                } else if (!autoOffsetResetPolicy.equals("earliest") && !autoOffsetResetPolicy.equals("none")) {
    -                    LOG.warn("Cannot guarantee at-least-once processing with auto.offset.reset.policy other than 'earliest' or 'none'."
    -                        + " Some messages may be skipped.");
    -                }
    -            } else if (builder.processingGuarantee == ProcessingGuarantee.AT_MOST_ONCE) {
    -                if (autoOffsetResetPolicy != null
    -                    && (!autoOffsetResetPolicy.equals("latest") && !autoOffsetResetPolicy.equals("none"))) {
    -                    LOG.warn("Cannot guarantee at-most-once processing with auto.offset.reset.policy other than 'latest' or 'none'."
    -                        + " Some messages may be processed more than once.");
    -                }
    +        String autoOffsetResetPolicy = (String) builder.kafkaProps.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
    +        if (builder.processingGuarantee == ProcessingGuarantee.AT_LEAST_ONCE) {
    +            if (autoOffsetResetPolicy == null) {
    +                /*
    +                 * If the user wants to explicitly set an auto offset reset policy, we should respect it, but when the spout is configured
    +                 * for at-least-once processing we should default to seeking to the earliest offset in case there's an offset out of range
    +                 * error, rather than seeking to the latest (Kafka's default). This type of error will typically happen when the consumer
    +                 * requests an offset that was deleted.
    +                 */
    +                LOG.info("Setting consumer property '{}' to 'earliest' to ensure at-least-once processing",
    --- End diff --
    
    NIT: I would write Kafka consumer, but not a deal breaker.


---

[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2538#discussion_r165852835
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadataManager.java ---
    @@ -0,0 +1,90 @@
    +/*
    + * Copyright 2018 The Apache Software Foundation.
    + *
    + * Licensed under the Apache License, Version 2.0 (the "License");
    + * you may not use this file except in compliance with the License.
    + * You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.kafka.spout.internal;
    +
    +import com.fasterxml.jackson.core.JsonProcessingException;
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import java.io.IOException;
    +import java.util.Map;
    +import org.apache.kafka.clients.consumer.OffsetAndMetadata;
    +import org.apache.kafka.common.TopicPartition;
    +import org.apache.storm.kafka.spout.KafkaSpout;
    +import org.apache.storm.kafka.spout.KafkaSpoutConfig.ProcessingGuarantee;
    +import org.apache.storm.task.TopologyContext;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Generates and reads commit metadata.
    + */
    +public class CommitMetadataManager {
    +
    +    private static final ObjectMapper JSON_MAPPER = new ObjectMapper();
    --- End diff --
    
    Do we want one ObjectMapper for all the KafkaSpout instances (executors), or one per executor? This will share it across all the instances. Perhaps we should have one per instance.


---

[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2538#discussion_r165854105
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadataManager.java ---
    @@ -0,0 +1,90 @@
    +/*
    + * Copyright 2018 The Apache Software Foundation.
    + *
    + * Licensed under the Apache License, Version 2.0 (the "License");
    + * you may not use this file except in compliance with the License.
    + * You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.kafka.spout.internal;
    +
    +import com.fasterxml.jackson.core.JsonProcessingException;
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import java.io.IOException;
    +import java.util.Map;
    +import org.apache.kafka.clients.consumer.OffsetAndMetadata;
    +import org.apache.kafka.common.TopicPartition;
    +import org.apache.storm.kafka.spout.KafkaSpout;
    +import org.apache.storm.kafka.spout.KafkaSpoutConfig.ProcessingGuarantee;
    +import org.apache.storm.task.TopologyContext;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Generates and reads commit metadata.
    + */
    +public class CommitMetadataManager {
    +
    +    private static final ObjectMapper JSON_MAPPER = new ObjectMapper();
    --- End diff --
    
    Objectmappers are thread safe. I don't see anywhere in the documentation that mentions whether there are internal locks being used in the ObjectMapper, but this post suggests there aren't https://stackoverflow.com/questions/18611565/how-do-i-correctly-reuse-jackson-objectmapper#comment27462917_18618918. A quick google also suggests that it should be fine to use ObjectMapper as a singleton.


---

[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2538#discussion_r165852696
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -396,7 +361,10 @@ private void setWaitingToEmit(ConsumerRecords<K, V> consumerRecords) {
                     numPolledRecords);
                 if (kafkaSpoutConfig.getProcessingGuarantee() == KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) {
                     //Commit polled records immediately to ensure delivery is at-most-once.
    -                kafkaConsumer.commitSync();
    +                Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = 
    +                    createOffsetsToCommitForConsumedOffsets(kafkaConsumer.assignment());
    +                kafkaConsumer.commitSync(offsetsToCommit);
    +                LOG.debug("Committed offsets {} to Kafka", offsetsToCommit);
    --- End diff --
    
    Committed offsets {} synchronously to Kafka


---

[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2538#discussion_r165853240
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -311,7 +273,10 @@ public void nextTuple() {
                     if (isAtLeastOnceProcessing()) {
                         commitOffsetsForAckedTuples(kafkaConsumer.assignment());
                     } else if (kafkaSpoutConfig.getProcessingGuarantee() == ProcessingGuarantee.NONE) {
    -                    commitConsumedOffsets(kafkaConsumer.assignment());
    +                    Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = 
    +                        createOffsetsToCommitForConsumedOffsets(kafkaConsumer.assignment());
    +                    kafkaConsumer.commitAsync(offsetsToCommit, null);
    --- End diff --
    
    createFetchedOffsetsMetadata


---

[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2538#discussion_r165858606
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadataManager.java ---
    @@ -0,0 +1,90 @@
    +/*
    + * Copyright 2018 The Apache Software Foundation.
    + *
    + * Licensed under the Apache License, Version 2.0 (the "License");
    + * you may not use this file except in compliance with the License.
    + * You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.kafka.spout.internal;
    +
    +import com.fasterxml.jackson.core.JsonProcessingException;
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import java.io.IOException;
    +import java.util.Map;
    +import org.apache.kafka.clients.consumer.OffsetAndMetadata;
    +import org.apache.kafka.common.TopicPartition;
    +import org.apache.storm.kafka.spout.KafkaSpout;
    +import org.apache.storm.kafka.spout.KafkaSpoutConfig.ProcessingGuarantee;
    +import org.apache.storm.task.TopologyContext;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Generates and reads commit metadata.
    + */
    +public class CommitMetadataManager {
    +
    +    private static final ObjectMapper JSON_MAPPER = new ObjectMapper();
    +    private static final Logger LOG = LoggerFactory.getLogger(CommitMetadata.class);
    +    // Metadata information to commit to Kafka. It is unique per spout instance.
    --- End diff --
    
    Right, I misunderstood. I thought you were talking about the comment in L38 (i.e. you wanted it changed to "... It is unique per CommitMetadataManager"). Will fix


---

[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2538#discussion_r165853341
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadataManager.java ---
    @@ -0,0 +1,90 @@
    +/*
    + * Copyright 2018 The Apache Software Foundation.
    + *
    + * Licensed under the Apache License, Version 2.0 (the "License");
    + * you may not use this file except in compliance with the License.
    + * You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.kafka.spout.internal;
    +
    +import com.fasterxml.jackson.core.JsonProcessingException;
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import java.io.IOException;
    +import java.util.Map;
    +import org.apache.kafka.clients.consumer.OffsetAndMetadata;
    +import org.apache.kafka.common.TopicPartition;
    +import org.apache.storm.kafka.spout.KafkaSpout;
    +import org.apache.storm.kafka.spout.KafkaSpoutConfig.ProcessingGuarantee;
    +import org.apache.storm.task.TopologyContext;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Generates and reads commit metadata.
    + */
    +public class CommitMetadataManager {
    --- End diff --
    
    All the methods and constructors in this class should be package protected


---

[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2538#discussion_r165853798
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -396,7 +361,10 @@ private void setWaitingToEmit(ConsumerRecords<K, V> consumerRecords) {
                     numPolledRecords);
                 if (kafkaSpoutConfig.getProcessingGuarantee() == KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) {
                     //Commit polled records immediately to ensure delivery is at-most-once.
    -                kafkaConsumer.commitSync();
    +                Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = 
    +                    createOffsetsToCommitForConsumedOffsets(kafkaConsumer.assignment());
    +                kafkaConsumer.commitSync(offsetsToCommit);
    --- End diff --
    
    Will rename


---

[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2538#discussion_r165857829
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java ---
    @@ -453,37 +460,37 @@ public Builder(String bootstrapServers, Subscription subscription) {
             return builder;
         }
     
    -    private static void setAutoCommitMode(Builder<?, ?> builder) {
    +    private static void setKafkaPropsForProcessingGuarantee(Builder<?, ?> builder) {
             if (builder.kafkaProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) {
    -            throw new IllegalArgumentException("Do not set " + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG + " manually."
    -                + " Instead use KafkaSpoutConfig.Builder.setProcessingGuarantee");
    +            throw new IllegalStateException("The KafkaConsumer " + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
    +                + " setting is not supported. You can configure similar behavior through KafkaSpoutConfig.Builder.setProcessingGuarantee");
             }
    -        if (builder.processingGuarantee == ProcessingGuarantee.NONE) {
    -            builder.kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
    -        } else {
    -            String autoOffsetResetPolicy = (String)builder.kafkaProps.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
    -            if (builder.processingGuarantee == ProcessingGuarantee.AT_LEAST_ONCE) {
    -                if (autoOffsetResetPolicy == null) {
    -                    /*
    -                    If the user wants to explicitly set an auto offset reset policy, we should respect it, but when the spout is configured
    -                    for at-least-once processing we should default to seeking to the earliest offset in case there's an offset out of range
    -                    error, rather than seeking to the latest (Kafka's default). This type of error will typically happen when the consumer 
    -                    requests an offset that was deleted.
    -                     */
    -                    builder.kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    -                } else if (!autoOffsetResetPolicy.equals("earliest") && !autoOffsetResetPolicy.equals("none")) {
    -                    LOG.warn("Cannot guarantee at-least-once processing with auto.offset.reset.policy other than 'earliest' or 'none'."
    -                        + " Some messages may be skipped.");
    -                }
    -            } else if (builder.processingGuarantee == ProcessingGuarantee.AT_MOST_ONCE) {
    -                if (autoOffsetResetPolicy != null
    -                    && (!autoOffsetResetPolicy.equals("latest") && !autoOffsetResetPolicy.equals("none"))) {
    -                    LOG.warn("Cannot guarantee at-most-once processing with auto.offset.reset.policy other than 'latest' or 'none'."
    -                        + " Some messages may be processed more than once.");
    -                }
    +        String autoOffsetResetPolicy = (String) builder.kafkaProps.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
    +        if (builder.processingGuarantee == ProcessingGuarantee.AT_LEAST_ONCE) {
    +            if (autoOffsetResetPolicy == null) {
    +                /*
    +                 * If the user wants to explicitly set an auto offset reset policy, we should respect it, but when the spout is configured
    +                 * for at-least-once processing we should default to seeking to the earliest offset in case there's an offset out of range
    +                 * error, rather than seeking to the latest (Kafka's default). This type of error will typically happen when the consumer
    +                 * requests an offset that was deleted.
    +                 */
    +                LOG.info("Setting consumer property '{}' to 'earliest' to ensure at-least-once processing",
    +                    ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
    +                builder.kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    +            } else if (!autoOffsetResetPolicy.equals("earliest") && !autoOffsetResetPolicy.equals("none")) {
    +                LOG.warn("Cannot guarantee at-least-once processing with auto.offset.reset.policy other than 'earliest' or 'none'."
    +                    + " Some messages may be skipped.");
    +            }
    +        } else if (builder.processingGuarantee == ProcessingGuarantee.AT_MOST_ONCE) {
    +            if (autoOffsetResetPolicy != null
    +                && (!autoOffsetResetPolicy.equals("latest") && !autoOffsetResetPolicy.equals("none"))) {
    +                LOG.warn("Cannot guarantee at-most-once processing with auto.offset.reset.policy other than 'latest' or 'none'."
    +                    + " Some messages may be processed more than once.");
                 }
    -            builder.kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
             }
    +        LOG.info("Setting consumer property '{}' to 'false', because the spout does not support auto-commit",
    --- End diff --
    
    NIT: I would write Kafka consumer, but not a deal breaker.


---

[GitHub] storm issue #2538: STORM-2913: Add metadata to at-most-once and at-least-onc...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/2538
  
    @hmcl Rebased.


---

[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2538#discussion_r165852272
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -142,7 +139,7 @@ public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputC
             offsetManagers = new HashMap<>();
             emitted = new HashSet<>();
             waitingToEmit = new HashMap<>();
    -        setCommitMetadata(context);
    +        commitMetadataManager = new CommitMetadataManager(context, kafkaSpoutConfig.getProcessingGuarantee());
    --- End diff --
    
    I wonder if this should become available to the KakfaSpout through KafkaSpoutConfig, perhaps using a factory such that we could make it pluggable, in case there is need to support a different behavior in the future.
    
    We can also wait to do that until we need it. Just wanted to get your thoughts on it.


---

[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2538#discussion_r165854458
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadataManager.java ---
    @@ -0,0 +1,90 @@
    +/*
    + * Copyright 2018 The Apache Software Foundation.
    + *
    + * Licensed under the Apache License, Version 2.0 (the "License");
    + * you may not use this file except in compliance with the License.
    + * You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.kafka.spout.internal;
    +
    +import com.fasterxml.jackson.core.JsonProcessingException;
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import java.io.IOException;
    +import java.util.Map;
    +import org.apache.kafka.clients.consumer.OffsetAndMetadata;
    +import org.apache.kafka.common.TopicPartition;
    +import org.apache.storm.kafka.spout.KafkaSpout;
    +import org.apache.storm.kafka.spout.KafkaSpoutConfig.ProcessingGuarantee;
    +import org.apache.storm.task.TopologyContext;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Generates and reads commit metadata.
    + */
    +public class CommitMetadataManager {
    +
    +    private static final ObjectMapper JSON_MAPPER = new ObjectMapper();
    +    private static final Logger LOG = LoggerFactory.getLogger(CommitMetadata.class);
    +    // Metadata information to commit to Kafka. It is unique per spout instance.
    +    private final String commitMetadata;
    +    private final ProcessingGuarantee processingGuarantee;
    +    private final TopologyContext context;
    +
    +    /**
    +     * Create a manager with the given context.
    +     */
    +    public CommitMetadataManager(TopologyContext context, ProcessingGuarantee processingGuarantee) {
    +        this.context = context;
    +        try {
    +            commitMetadata = JSON_MAPPER.writeValueAsString(new CommitMetadata(
    --- End diff --
    
    I would agree if CommitMetadata weren't a POJO. It doesn't have any behavior, why do we need to stub it?


---

[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2538#discussion_r165853046
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -469,11 +437,14 @@ private boolean emitOrRetryTuple(ConsumerRecord<K, V> record) {
                 LOG.trace("Tuple for record [{}] has already been emitted. Skipping", record);
             } else {
                 final OffsetAndMetadata committedOffset = kafkaConsumer.committed(tp);
    -            if (committedOffset != null && isOffsetCommittedByThisTopology(tp, committedOffset)
    +            if (isAtLeastOnceProcessing()
    +                && committedOffset != null 
    +                && commitMetadataManager.isOffsetCommittedByThisTopology(tp, committedOffset, offsetManagers)
    --- End diff --
    
    Collections.unmodifiableMap(offsetManagers)


---

[GitHub] storm issue #2538: STORM-2913: Add metadata to at-most-once and at-least-onc...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/storm/pull/2538
  
    @srdo 
    Thanks for the great work. I'd rather not review the two PRs since @hmcl already gave a nice review. If you would like to merge twos by yourself, please replace storm-kafka-client 1.1.x/1.0.x with 1.x afterwards. If you would rather let me do it please let me know. Thanks in advance!


---

[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2538#discussion_r165857516
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadataManager.java ---
    @@ -0,0 +1,90 @@
    +/*
    + * Copyright 2018 The Apache Software Foundation.
    + *
    + * Licensed under the Apache License, Version 2.0 (the "License");
    + * you may not use this file except in compliance with the License.
    + * You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.kafka.spout.internal;
    +
    +import com.fasterxml.jackson.core.JsonProcessingException;
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import java.io.IOException;
    +import java.util.Map;
    +import org.apache.kafka.clients.consumer.OffsetAndMetadata;
    +import org.apache.kafka.common.TopicPartition;
    +import org.apache.storm.kafka.spout.KafkaSpout;
    +import org.apache.storm.kafka.spout.KafkaSpoutConfig.ProcessingGuarantee;
    +import org.apache.storm.task.TopologyContext;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Generates and reads commit metadata.
    + */
    +public class CommitMetadataManager {
    +
    +    private static final ObjectMapper JSON_MAPPER = new ObjectMapper();
    --- End diff --
    
    The only question mark is performance. I am OK with it staying like this. I just wanted to bring it up. If not for performance, in the extreme case, there would be no harm in creating one ObjectMapper in an Utility class and use it in the entire codebase.


---

[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2538#discussion_r165854164
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadataManager.java ---
    @@ -0,0 +1,90 @@
    +/*
    + * Copyright 2018 The Apache Software Foundation.
    + *
    + * Licensed under the Apache License, Version 2.0 (the "License");
    + * you may not use this file except in compliance with the License.
    + * You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.kafka.spout.internal;
    +
    +import com.fasterxml.jackson.core.JsonProcessingException;
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import java.io.IOException;
    +import java.util.Map;
    +import org.apache.kafka.clients.consumer.OffsetAndMetadata;
    +import org.apache.kafka.common.TopicPartition;
    +import org.apache.storm.kafka.spout.KafkaSpout;
    +import org.apache.storm.kafka.spout.KafkaSpoutConfig.ProcessingGuarantee;
    +import org.apache.storm.task.TopologyContext;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Generates and reads commit metadata.
    + */
    +public class CommitMetadataManager {
    +
    +    private static final ObjectMapper JSON_MAPPER = new ObjectMapper();
    +    private static final Logger LOG = LoggerFactory.getLogger(CommitMetadata.class);
    +    // Metadata information to commit to Kafka. It is unique per spout instance.
    --- End diff --
    
    No, the metadata is the same if you create two CommitMetadataManagers from the same spout instance. 


---

[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2538#discussion_r165857949
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadataManager.java ---
    @@ -0,0 +1,90 @@
    +/*
    + * Copyright 2018 The Apache Software Foundation.
    + *
    + * Licensed under the Apache License, Version 2.0 (the "License");
    + * you may not use this file except in compliance with the License.
    + * You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.kafka.spout.internal;
    +
    +import com.fasterxml.jackson.core.JsonProcessingException;
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import java.io.IOException;
    +import java.util.Map;
    +import org.apache.kafka.clients.consumer.OffsetAndMetadata;
    +import org.apache.kafka.common.TopicPartition;
    +import org.apache.storm.kafka.spout.KafkaSpout;
    +import org.apache.storm.kafka.spout.KafkaSpoutConfig.ProcessingGuarantee;
    +import org.apache.storm.task.TopologyContext;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Generates and reads commit metadata.
    + */
    +public class CommitMetadataManager {
    +
    +    private static final ObjectMapper JSON_MAPPER = new ObjectMapper();
    +    private static final Logger LOG = LoggerFactory.getLogger(CommitMetadata.class);
    +    // Metadata information to commit to Kafka. It is unique per spout instance.
    --- End diff --
    
    I disagree. The name of the logger instance should be the class that is logging the message or one of its super classes. There is no class called CommitMetadata in the codebase, so why should we have a logger called CommitMetadata?


---

[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2538#discussion_r165852737
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -469,11 +437,14 @@ private boolean emitOrRetryTuple(ConsumerRecord<K, V> record) {
                 LOG.trace("Tuple for record [{}] has already been emitted. Skipping", record);
             } else {
                 final OffsetAndMetadata committedOffset = kafkaConsumer.committed(tp);
    -            if (committedOffset != null && isOffsetCommittedByThisTopology(tp, committedOffset)
    +            if (isAtLeastOnceProcessing()
    +                && committedOffset != null 
    +                && commitMetadataManager.isOffsetCommittedByThisTopology(tp, committedOffset, offsetManagers)
                     && committedOffset.offset() > record.offset()) {
                     // Ensures that after a topology with this id is started, the consumer fetch
                     // position never falls behind the committed offset (STORM-2844)
    -                throw new IllegalStateException("Attempting to emit a message that has already been committed.");
    +                throw new IllegalStateException("Attempting to emit a message that has already been committed."
    +                    + " This should never occur in at-least-once mode.");
    --- End diff --
    
    for at-least-once semantics.


---