You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by haohui <gi...@git.apache.org> on 2015/11/19 22:34:31 UTC

[GitHub] storm pull request: STORM-1220. Avoid double copying in the Kafka ...

GitHub user haohui opened a pull request:

    https://github.com/apache/storm/pull/894

    STORM-1220. Avoid double copying in the Kafka spout.

    Currently the Kafka spout performs an extra copy for all payloads in Kafka when integrating with Storm. This PR proposes to avoid the extra copy to improve performance.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/haohui/storm STORM-1220

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/storm/pull/894.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #894
    
----
commit 0850c7a3059bec664524d842ea98aecc5e19889f
Author: Haohui Mai <wh...@apache.org>
Date:   2015-11-19T21:32:20Z

    STORM-1220. Avoid double copying in the Kafka spout.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1220. Avoid double copying in the Kafka ...

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on the pull request:

    https://github.com/apache/storm/pull/894#issuecomment-159705658
  
    +1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1220. Avoid double copying in the Kafka ...

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on a diff in the pull request:

    https://github.com/apache/storm/pull/894#discussion_r45529315
  
    --- Diff: storm-core/src/jvm/backtype/storm/spout/RawScheme.java ---
    @@ -18,11 +18,13 @@
     package backtype.storm.spout;
     
     import backtype.storm.tuple.Fields;
    +
    +import java.nio.ByteBuffer;
     import java.util.List;
     import static backtype.storm.utils.Utils.tuple;
     
     public class RawScheme implements Scheme {
    -    public List<Object> deserialize(byte[] ser) {
    +    public List<Object> deserialize(ByteBuffer ser) {
    --- End diff --
    
    @arunmahadevan good point. we can use ByteBuffer.array() to have same signature.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1220. Avoid double copying in the Kafka ...

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/storm/pull/894#discussion_r45436425
  
    --- Diff: storm-core/src/jvm/backtype/storm/spout/RawScheme.java ---
    @@ -18,11 +18,13 @@
     package backtype.storm.spout;
     
     import backtype.storm.tuple.Fields;
    +
    +import java.nio.ByteBuffer;
     import java.util.List;
     import static backtype.storm.utils.Utils.tuple;
     
     public class RawScheme implements Scheme {
    -    public List<Object> deserialize(byte[] ser) {
    +    public List<Object> deserialize(ByteBuffer ser) {
    --- End diff --
    
    @harshach Actually I was referring to the receiver (the bolts) that might be currently doing something like `byte[] bytes = inputTuple.getBinaryByField("bytes");` to get the data emitted from kafka spout. It appears that the `deserialize` method returns a tuple that wraps the input ByteBuffer which gets emitted.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1220. Avoid double copying in the Kafka ...

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on the pull request:

    https://github.com/apache/storm/pull/894#issuecomment-159112473
  
    +1. @arunmahadevan can you take a look at it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1220. Avoid double copying in the Kafka ...

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on the pull request:

    https://github.com/apache/storm/pull/894#issuecomment-159140476
  
    Overall it looks good. I think https://github.com/apache/storm/pull/894#discussion_r45434875 should be addressed. Otherwise it may break existing bolts.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1220. Avoid double copying in the Kafka ...

Posted by haohui <gi...@git.apache.org>.
Github user haohui commented on the pull request:

    https://github.com/apache/storm/pull/894#issuecomment-159699064
  
    Addressed in the latest patch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1220. Avoid double copying in the Kafka ...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/storm/pull/894


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1220. Avoid double copying in the Kafka ...

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on a diff in the pull request:

    https://github.com/apache/storm/pull/894#discussion_r45436089
  
    --- Diff: storm-core/src/jvm/backtype/storm/spout/RawScheme.java ---
    @@ -18,11 +18,13 @@
     package backtype.storm.spout;
     
     import backtype.storm.tuple.Fields;
    +
    +import java.nio.ByteBuffer;
     import java.util.List;
     import static backtype.storm.utils.Utils.tuple;
     
     public class RawScheme implements Scheme {
    -    public List<Object> deserialize(byte[] ser) {
    +    public List<Object> deserialize(ByteBuffer ser) {
    --- End diff --
    
    @arunmahadevan it might but I don't seen any one using this apart from KafkaSpout


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1220. Avoid double copying in the Kafka ...

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/storm/pull/894#discussion_r45563348
  
    --- Diff: external/storm-kafka/src/jvm/storm/kafka/StringScheme.java ---
    @@ -20,23 +20,27 @@
     import backtype.storm.spout.Scheme;
     import backtype.storm.tuple.Fields;
     import backtype.storm.tuple.Values;
    +import backtype.storm.utils.Utils;
     
    -import java.io.UnsupportedEncodingException;
    +import java.nio.ByteBuffer;
    +import java.nio.charset.Charset;
    +import java.nio.charset.StandardCharsets;
     import java.util.List;
     
     public class StringScheme implements Scheme {
    -
    +    private static final Charset UTF8_CHARSET = StandardCharsets.UTF_8;
         public static final String STRING_SCHEME_KEY = "str";
     
    -    public List<Object> deserialize(byte[] bytes) {
    +    public List<Object> deserialize(ByteBuffer bytes) {
             return new Values(deserializeString(bytes));
         }
     
    -    public static String deserializeString(byte[] string) {
    -        try {
    -            return new String(string, "UTF-8");
    -        } catch (UnsupportedEncodingException e) {
    -            throw new RuntimeException(e);
    +    public static String deserializeString(ByteBuffer string) {
    +        if (!string.hasArray()) {
    +            int base = string.arrayOffset();
    +            return new String(string.array(), base + string.position(), base + string.limit());
    --- End diff --
    
    `String` ctor takes length as the last arg, so you need to pass `limit - pos`. Also the charset is missing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1220. Avoid double copying in the Kafka ...

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on the pull request:

    https://github.com/apache/storm/pull/894#issuecomment-159716347
  
    Thanks @haohui  merged into master.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1220. Avoid double copying in the Kafka ...

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/storm/pull/894#discussion_r45435067
  
    --- Diff: external/storm-kafka/src/jvm/storm/kafka/StringScheme.java ---
    @@ -20,21 +20,23 @@
     import backtype.storm.spout.Scheme;
     import backtype.storm.tuple.Fields;
     import backtype.storm.tuple.Values;
    +import backtype.storm.utils.Utils;
     
     import java.io.UnsupportedEncodingException;
    +import java.nio.ByteBuffer;
     import java.util.List;
     
     public class StringScheme implements Scheme {
     
         public static final String STRING_SCHEME_KEY = "str";
     
    -    public List<Object> deserialize(byte[] bytes) {
    +    public List<Object> deserialize(ByteBuffer bytes) {
             return new Values(deserializeString(bytes));
         }
     
    -    public static String deserializeString(byte[] string) {
    +    public static String deserializeString(ByteBuffer string) {
             try {
    -            return new String(string, "UTF-8");
    +            return new String(Utils.toByteArray(string), "UTF-8");
    --- End diff --
    
    This would create a copy. If the ByteBuffer is array backed, you could use `ByteBuffer.array()` to avoid copy.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1220. Avoid double copying in the Kafka ...

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/storm/pull/894#discussion_r45434875
  
    --- Diff: storm-core/src/jvm/backtype/storm/spout/RawScheme.java ---
    @@ -18,11 +18,13 @@
     package backtype.storm.spout;
     
     import backtype.storm.tuple.Fields;
    +
    +import java.nio.ByteBuffer;
     import java.util.List;
     import static backtype.storm.utils.Utils.tuple;
     
     public class RawScheme implements Scheme {
    -    public List<Object> deserialize(byte[] ser) {
    +    public List<Object> deserialize(ByteBuffer ser) {
    --- End diff --
    
    Will this break the compatibility at the receiver ?. Earlier the tuple contained a `byte[]` and with the change it would contain a `ByteBuffer`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1220. Avoid double copying in the Kafka ...

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/storm/pull/894#discussion_r45563324
  
    --- Diff: external/storm-kafka/src/jvm/storm/kafka/StringScheme.java ---
    @@ -20,23 +20,27 @@
     import backtype.storm.spout.Scheme;
     import backtype.storm.tuple.Fields;
     import backtype.storm.tuple.Values;
    +import backtype.storm.utils.Utils;
     
    -import java.io.UnsupportedEncodingException;
    +import java.nio.ByteBuffer;
    +import java.nio.charset.Charset;
    +import java.nio.charset.StandardCharsets;
     import java.util.List;
     
     public class StringScheme implements Scheme {
    -
    +    private static final Charset UTF8_CHARSET = StandardCharsets.UTF_8;
         public static final String STRING_SCHEME_KEY = "str";
     
    -    public List<Object> deserialize(byte[] bytes) {
    +    public List<Object> deserialize(ByteBuffer bytes) {
             return new Values(deserializeString(bytes));
         }
     
    -    public static String deserializeString(byte[] string) {
    -        try {
    -            return new String(string, "UTF-8");
    -        } catch (UnsupportedEncodingException e) {
    -            throw new RuntimeException(e);
    +    public static String deserializeString(ByteBuffer string) {
    +        if (!string.hasArray()) {
    --- End diff --
    
    I think this should be `if (string.hasArray())`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---