You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "huangyu (JIRA)" <ji...@apache.org> on 2016/03/03 17:19:18 UTC

[jira] [Updated] (SPARK-13652) spark netty network issu

     [ https://issues.apache.org/jira/browse/SPARK-13652?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

huangyu updated SPARK-13652:
----------------------------
    Description: 
TransportClient is not thread safe and if it is called from multiple threads, the messages can't be encoded and decoded correctly. Below is my code,and it will print wrong message.

public static void main(String[] args) throws IOException, InterruptedException {

        TransportServer server = new TransportContext(new TransportConf("test",
                new MapConfigProvider(new HashMap<String, String>())), new RankHandler()).
                createServer(8081, new LinkedList<TransportServerBootstrap>());

        TransportContext context = new TransportContext(new TransportConf("test",
                new MapConfigProvider(new HashMap<String, String>())), new NoOpRpcHandler(), true);
        final TransportClientFactory clientFactory = context.createClientFactory();
        List<Thread> ts = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            ts.add(new Thread(new Runnable() {
                @Override
                public void run() {
                    for (int j = 0; j < 1000; j++) {
                        try {
                            ByteBuf buf = Unpooled.buffer(8);
                            buf.writeLong((long) j);
                            ByteBuffer byteBuffer = clientFactory.createClient("localhost", 8081).
                                    sendRpcSync(buf.nioBuffer(), Long.MAX_VALUE);

                            long response = byteBuffer.getLong();
                            if (response != j) {
                                System.err.println("send:" + j + ",response:" + response);
                            }
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }));
            ts.get(i).start();
        }
        for (Thread t : ts) {
            t.join();
        }
        server.close();

    }



it will print as below
send:221,response:222
send:233,response:234
send:312,response:313
send:358,response:359
...

  was:
TransportClient is not thread safe and if it is called from multiple threads, the messages can't be encoded and decoded correctly. Below is my code,and it will print wrong message.

public static void main(String[] args) throws IOException, InterruptedException {

        TransportServer server = new TransportContext(new TransportConf("test",
                new MapConfigProvider(new HashMap<String, String>())), new RankHandler()).
                createServer(8081, new LinkedList<TransportServerBootstrap>());

        TransportContext context = new TransportContext(new TransportConf("test",
                new MapConfigProvider(new HashMap<String, String>())), new NoOpRpcHandler(), true);
        final TransportClientFactory clientFactory = context.createClientFactory();
        List<Thread> ts = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            ts.add(new Thread(new Runnable() {
                @Override
                public void run() {
                    for (int j = 0; j < 1000; j++) {
                        try {
                            ByteBuf buf = Unpooled.buffer(8);
                            buf.writeLong((long) j);
                            ByteBuffer byteBuffer = clientFactory.createClient("localhost", 8081).
                                    sendRpcSync(buf.nioBuffer(), Long.MAX_VALUE);

                            long response = byteBuffer.getLong();
                            if (response != j) {
                                System.err.println("send:" + j + ",response:" + response);
                            }
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }));
            ts.get(i).start();
        }
        for (Thread t : ts) {
            t.join();
        }
        server.close();

    }


> spark netty network issu
> ------------------------
>
>                 Key: SPARK-13652
>                 URL: https://issues.apache.org/jira/browse/SPARK-13652
>             Project: Spark
>          Issue Type: Bug
>    Affects Versions: 1.5.1, 1.5.2, 1.6.0
>            Reporter: huangyu
>
> TransportClient is not thread safe and if it is called from multiple threads, the messages can't be encoded and decoded correctly. Below is my code,and it will print wrong message.
> public static void main(String[] args) throws IOException, InterruptedException {
>         TransportServer server = new TransportContext(new TransportConf("test",
>                 new MapConfigProvider(new HashMap<String, String>())), new RankHandler()).
>                 createServer(8081, new LinkedList<TransportServerBootstrap>());
>         TransportContext context = new TransportContext(new TransportConf("test",
>                 new MapConfigProvider(new HashMap<String, String>())), new NoOpRpcHandler(), true);
>         final TransportClientFactory clientFactory = context.createClientFactory();
>         List<Thread> ts = new ArrayList<>();
>         for (int i = 0; i < 10; i++) {
>             ts.add(new Thread(new Runnable() {
>                 @Override
>                 public void run() {
>                     for (int j = 0; j < 1000; j++) {
>                         try {
>                             ByteBuf buf = Unpooled.buffer(8);
>                             buf.writeLong((long) j);
>                             ByteBuffer byteBuffer = clientFactory.createClient("localhost", 8081).
>                                     sendRpcSync(buf.nioBuffer(), Long.MAX_VALUE);
>                             long response = byteBuffer.getLong();
>                             if (response != j) {
>                                 System.err.println("send:" + j + ",response:" + response);
>                             }
>                         } catch (IOException e) {
>                             e.printStackTrace();
>                         }
>                     }
>                 }
>             }));
>             ts.get(i).start();
>         }
>         for (Thread t : ts) {
>             t.join();
>         }
>         server.close();
>     }
> it will print as below
> send:221,response:222
> send:233,response:234
> send:312,response:313
> send:358,response:359
> ...



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org