You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "huangyu (JIRA)" <ji...@apache.org> on 2016/03/04 03:04:40 UTC

[jira] [Commented] (SPARK-13652) TransportClient.sendRpcSync returns wrong results

    [ https://issues.apache.org/jira/browse/SPARK-13652?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15179157#comment-15179157 ] 

huangyu commented on SPARK-13652:
---------------------------------

I think it is about fetching stream rather than sendRpc

> TransportClient.sendRpcSync returns wrong results
> -------------------------------------------------
>
>                 Key: SPARK-13652
>                 URL: https://issues.apache.org/jira/browse/SPARK-13652
>             Project: Spark
>          Issue Type: Bug
>    Affects Versions: 1.6.0
>            Reporter: huangyu
>         Attachments: RankHandler.java, Test.java
>
>
> TransportClient is not thread safe and if it is called from multiple threads, the messages can't be encoded and decoded correctly. Below is my code,and it will print wrong message.
> {code}
> public static void main(String[] args) throws IOException, InterruptedException {
>         TransportServer server = new TransportContext(new TransportConf("test",
>                 new MapConfigProvider(new HashMap<String, String>())), new RankHandler()).
>                 createServer(8081, new LinkedList<TransportServerBootstrap>());
>         TransportContext context = new TransportContext(new TransportConf("test",
>                 new MapConfigProvider(new HashMap<String, String>())), new NoOpRpcHandler(), true);
>         final TransportClientFactory clientFactory = context.createClientFactory();
>         List<Thread> ts = new ArrayList<>();
>         for (int i = 0; i < 10; i++) {
>             ts.add(new Thread(new Runnable() {
>                 @Override
>                 public void run() {
>                     for (int j = 0; j < 1000; j++) {
>                         try {
>                             ByteBuf buf = Unpooled.buffer(8);
>                             buf.writeLong((long) j);
>                             ByteBuffer byteBuffer = clientFactory.createClient("localhost", 8081).
>                                     sendRpcSync(buf.nioBuffer(), Long.MAX_VALUE);
>                             long response = byteBuffer.getLong();
>                             if (response != j) {
>                                 System.err.println("send:" + j + ",response:" + response);
>                             }
>                         } catch (IOException e) {
>                             e.printStackTrace();
>                         }
>                     }
>                 }
>             }));
>             ts.get(i).start();
>         }
>         for (Thread t : ts) {
>             t.join();
>         }
>         server.close();
>     }
> public class RankHandler extends RpcHandler {
>     private final Logger logger = LoggerFactory.getLogger(RankHandler.class);
>     private final StreamManager streamManager;
>     public RankHandler() {
>         this.streamManager = new OneForOneStreamManager();
>     }
>     @Override
>     public void receive(TransportClient client, ByteBuffer msg, RpcResponseCallback callback) {
>         callback.onSuccess(msg);
>     }
>     @Override
>     public StreamManager getStreamManager() {
>         return streamManager;
>     }
> }
> {code}
> it will print as below
> send:221,response:222
> send:233,response:234
> send:312,response:313
> send:358,response:359
> ...



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org