You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by vesense <gi...@git.apache.org> on 2016/09/13 09:37:16 UTC

[GitHub] storm pull request #1683: STORM-2092: optimize TridentKafkaState batch sendi...

GitHub user vesense opened a pull request:

    https://github.com/apache/storm/pull/1683

    STORM-2092: optimize TridentKafkaState batch sending

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/vesense/storm STORM-2092

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/storm/pull/1683.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1683
    
----
commit 9888bf6b994a29a2f18ac8126c249dd314ed764b
Author: vesense <be...@163.com>
Date:   2016-09-13T09:22:48Z

    STORM-2092: optimize TridentKafkaState batch sending

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1683: STORM-2092: optimize TridentKafkaState batch sending

Posted by vesense <gi...@git.apache.org>.
Github user vesense commented on the issue:

    https://github.com/apache/storm/pull/1683
  
    ci failure is unrelated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1683: STORM-2092: optimize TridentKafkaState batch sendi...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/storm/pull/1683


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1683: STORM-2092: optimize TridentKafkaState batch sending

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/storm/pull/1683
  
    +1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1683: STORM-2092: optimize TridentKafkaState batch sendi...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1683#discussion_r78543845
  
    --- Diff: external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaState.java ---
    @@ -73,30 +74,35 @@ public void prepare(Properties options) {
     
         public void updateState(List<TridentTuple> tuples, TridentCollector collector) {
             String topic = null;
    -        for (TridentTuple tuple : tuples) {
    -            try {
    +        try {
    +            List<Future<RecordMetadata>> futures = new ArrayList<>(tuples.size());
    +            for (TridentTuple tuple : tuples) {
                     topic = topicSelector.getTopic(tuple);
     
                     if(topic != null) {
                         Future<RecordMetadata> result = producer.send(new ProducerRecord(topic,
                                 mapper.getKeyFromTuple(tuple), mapper.getMessageFromTuple(tuple)));
    -                    try {
    -                        result.get();
    -                    } catch (ExecutionException e) {
    -                        String errorMsg = "Could not retrieve result for message with key = "
    -                                + mapper.getKeyFromTuple(tuple) + " from topic = " + topic;
    -                        LOG.error(errorMsg, e);
    -                        throw new FailedException(errorMsg, e);
    -                    }
    +                    futures.add(result);
                     } else {
                         LOG.warn("skipping key = " + mapper.getKeyFromTuple(tuple) + ", topic selector returned null.");
                     }
    -            } catch (Exception ex) {
    -                String errorMsg = "Could not send message with key = " + mapper.getKeyFromTuple(tuple)
    -                        + " to topic = " + topic;
    -                LOG.warn(errorMsg, ex);
    -                throw new FailedException(errorMsg, ex);
                 }
    +
    +            for (int i = 0 ; i < futures.size(); i++) {
    +                Future<RecordMetadata> future = futures.get(i);
    +                try {
    +                    future.get();
    +                } catch (ExecutionException e) {
    +                    String errorMsg = "Could not retrieve result for message with key = "
    +                            + mapper.getKeyFromTuple(tuples.get(i)) + " from topic = " + topic;
    +                    LOG.error(errorMsg, e);
    +                    throw new FailedException(errorMsg, e);
    --- End diff --
    
    Since it already sends multiple requests, we need to log other errors as well and throw FailedException with summarized message (containing all errors) if any.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1683: STORM-2092: optimize TridentKafkaState batch sendi...

Posted by vesense <gi...@git.apache.org>.
Github user vesense commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1683#discussion_r78545097
  
    --- Diff: external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaState.java ---
    @@ -73,30 +74,35 @@ public void prepare(Properties options) {
     
         public void updateState(List<TridentTuple> tuples, TridentCollector collector) {
             String topic = null;
    -        for (TridentTuple tuple : tuples) {
    -            try {
    +        try {
    +            List<Future<RecordMetadata>> futures = new ArrayList<>(tuples.size());
    +            for (TridentTuple tuple : tuples) {
                     topic = topicSelector.getTopic(tuple);
     
                     if(topic != null) {
                         Future<RecordMetadata> result = producer.send(new ProducerRecord(topic,
                                 mapper.getKeyFromTuple(tuple), mapper.getMessageFromTuple(tuple)));
    -                    try {
    -                        result.get();
    -                    } catch (ExecutionException e) {
    -                        String errorMsg = "Could not retrieve result for message with key = "
    -                                + mapper.getKeyFromTuple(tuple) + " from topic = " + topic;
    -                        LOG.error(errorMsg, e);
    -                        throw new FailedException(errorMsg, e);
    -                    }
    +                    futures.add(result);
                     } else {
                         LOG.warn("skipping key = " + mapper.getKeyFromTuple(tuple) + ", topic selector returned null.");
                     }
    -            } catch (Exception ex) {
    -                String errorMsg = "Could not send message with key = " + mapper.getKeyFromTuple(tuple)
    -                        + " to topic = " + topic;
    -                LOG.warn(errorMsg, ex);
    -                throw new FailedException(errorMsg, ex);
                 }
    +
    +            for (int i = 0 ; i < futures.size(); i++) {
    +                Future<RecordMetadata> future = futures.get(i);
    +                try {
    +                    future.get();
    +                } catch (ExecutionException e) {
    +                    String errorMsg = "Could not retrieve result for message with key = "
    +                            + mapper.getKeyFromTuple(tuples.get(i)) + " from topic = " + topic;
    +                    LOG.error(errorMsg, e);
    +                    throw new FailedException(errorMsg, e);
    --- End diff --
    
    Will fix


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1683: STORM-2092: optimize TridentKafkaState batch sendi...

Posted by vesense <gi...@git.apache.org>.
Github user vesense commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1683#discussion_r78544642
  
    --- Diff: external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaState.java ---
    @@ -73,30 +74,35 @@ public void prepare(Properties options) {
     
         public void updateState(List<TridentTuple> tuples, TridentCollector collector) {
             String topic = null;
    -        for (TridentTuple tuple : tuples) {
    -            try {
    +        try {
    +            List<Future<RecordMetadata>> futures = new ArrayList<>(tuples.size());
    +            for (TridentTuple tuple : tuples) {
                     topic = topicSelector.getTopic(tuple);
     
                     if(topic != null) {
                         Future<RecordMetadata> result = producer.send(new ProducerRecord(topic,
                                 mapper.getKeyFromTuple(tuple), mapper.getMessageFromTuple(tuple)));
    -                    try {
    -                        result.get();
    -                    } catch (ExecutionException e) {
    -                        String errorMsg = "Could not retrieve result for message with key = "
    -                                + mapper.getKeyFromTuple(tuple) + " from topic = " + topic;
    -                        LOG.error(errorMsg, e);
    -                        throw new FailedException(errorMsg, e);
    -                    }
    +                    futures.add(result);
                     } else {
                         LOG.warn("skipping key = " + mapper.getKeyFromTuple(tuple) + ", topic selector returned null.");
                     }
    -            } catch (Exception ex) {
    -                String errorMsg = "Could not send message with key = " + mapper.getKeyFromTuple(tuple)
    -                        + " to topic = " + topic;
    -                LOG.warn(errorMsg, ex);
    -                throw new FailedException(errorMsg, ex);
                 }
    +
    +            for (int i = 0 ; i < futures.size(); i++) {
    --- End diff --
    
    see errorMsg tuples.get(i)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1683: STORM-2092: optimize TridentKafkaState batch sendi...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1683#discussion_r78545083
  
    --- Diff: external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaState.java ---
    @@ -73,30 +74,35 @@ public void prepare(Properties options) {
     
         public void updateState(List<TridentTuple> tuples, TridentCollector collector) {
             String topic = null;
    -        for (TridentTuple tuple : tuples) {
    -            try {
    +        try {
    +            List<Future<RecordMetadata>> futures = new ArrayList<>(tuples.size());
    +            for (TridentTuple tuple : tuples) {
                     topic = topicSelector.getTopic(tuple);
     
                     if(topic != null) {
                         Future<RecordMetadata> result = producer.send(new ProducerRecord(topic,
                                 mapper.getKeyFromTuple(tuple), mapper.getMessageFromTuple(tuple)));
    -                    try {
    -                        result.get();
    -                    } catch (ExecutionException e) {
    -                        String errorMsg = "Could not retrieve result for message with key = "
    -                                + mapper.getKeyFromTuple(tuple) + " from topic = " + topic;
    -                        LOG.error(errorMsg, e);
    -                        throw new FailedException(errorMsg, e);
    -                    }
    +                    futures.add(result);
                     } else {
                         LOG.warn("skipping key = " + mapper.getKeyFromTuple(tuple) + ", topic selector returned null.");
                     }
    -            } catch (Exception ex) {
    -                String errorMsg = "Could not send message with key = " + mapper.getKeyFromTuple(tuple)
    -                        + " to topic = " + topic;
    -                LOG.warn(errorMsg, ex);
    -                throw new FailedException(errorMsg, ex);
                 }
    +
    +            for (int i = 0 ; i < futures.size(); i++) {
    --- End diff --
    
    Yeah you're right. I missed it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1683: STORM-2092: optimize TridentKafkaState batch sendi...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1683#discussion_r78543513
  
    --- Diff: external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaState.java ---
    @@ -73,30 +74,35 @@ public void prepare(Properties options) {
     
         public void updateState(List<TridentTuple> tuples, TridentCollector collector) {
             String topic = null;
    -        for (TridentTuple tuple : tuples) {
    -            try {
    +        try {
    +            List<Future<RecordMetadata>> futures = new ArrayList<>(tuples.size());
    +            for (TridentTuple tuple : tuples) {
                     topic = topicSelector.getTopic(tuple);
     
                     if(topic != null) {
                         Future<RecordMetadata> result = producer.send(new ProducerRecord(topic,
                                 mapper.getKeyFromTuple(tuple), mapper.getMessageFromTuple(tuple)));
    -                    try {
    -                        result.get();
    -                    } catch (ExecutionException e) {
    -                        String errorMsg = "Could not retrieve result for message with key = "
    -                                + mapper.getKeyFromTuple(tuple) + " from topic = " + topic;
    -                        LOG.error(errorMsg, e);
    -                        throw new FailedException(errorMsg, e);
    -                    }
    +                    futures.add(result);
                     } else {
                         LOG.warn("skipping key = " + mapper.getKeyFromTuple(tuple) + ", topic selector returned null.");
                     }
    -            } catch (Exception ex) {
    -                String errorMsg = "Could not send message with key = " + mapper.getKeyFromTuple(tuple)
    -                        + " to topic = " + topic;
    -                LOG.warn(errorMsg, ex);
    -                throw new FailedException(errorMsg, ex);
                 }
    +
    +            for (int i = 0 ; i < futures.size(); i++) {
    --- End diff --
    
    for (Future<RecordMetadata> future : futures) {


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1683: STORM-2092: optimize TridentKafkaState batch sendi...

Posted by vesense <gi...@git.apache.org>.
Github user vesense commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1683#discussion_r79296575
  
    --- Diff: external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaState.java ---
    @@ -73,30 +74,35 @@ public void prepare(Properties options) {
     
         public void updateState(List<TridentTuple> tuples, TridentCollector collector) {
             String topic = null;
    -        for (TridentTuple tuple : tuples) {
    -            try {
    +        try {
    +            List<Future<RecordMetadata>> futures = new ArrayList<>(tuples.size());
    +            for (TridentTuple tuple : tuples) {
                     topic = topicSelector.getTopic(tuple);
     
                     if(topic != null) {
                         Future<RecordMetadata> result = producer.send(new ProducerRecord(topic,
                                 mapper.getKeyFromTuple(tuple), mapper.getMessageFromTuple(tuple)));
    -                    try {
    -                        result.get();
    -                    } catch (ExecutionException e) {
    -                        String errorMsg = "Could not retrieve result for message with key = "
    -                                + mapper.getKeyFromTuple(tuple) + " from topic = " + topic;
    -                        LOG.error(errorMsg, e);
    -                        throw new FailedException(errorMsg, e);
    -                    }
    +                    futures.add(result);
                     } else {
                         LOG.warn("skipping key = " + mapper.getKeyFromTuple(tuple) + ", topic selector returned null.");
                     }
    -            } catch (Exception ex) {
    -                String errorMsg = "Could not send message with key = " + mapper.getKeyFromTuple(tuple)
    -                        + " to topic = " + topic;
    -                LOG.warn(errorMsg, ex);
    -                throw new FailedException(errorMsg, ex);
                 }
    +
    +            for (int i = 0 ; i < futures.size(); i++) {
    +                Future<RecordMetadata> future = futures.get(i);
    +                try {
    +                    future.get();
    +                } catch (ExecutionException e) {
    +                    String errorMsg = "Could not retrieve result for message with key = "
    +                            + mapper.getKeyFromTuple(tuples.get(i)) + " from topic = " + topic;
    +                    LOG.error(errorMsg, e);
    +                    throw new FailedException(errorMsg, e);
    --- End diff --
    
    @HeartSaVioR Fixed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---