You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by "li yuntian (JIRA)" <ji...@apache.org> on 2017/08/18 07:38:00 UTC

[jira] [Updated] (BEAM-2778) Serialization stack error using spark stream

     [ https://issues.apache.org/jira/browse/BEAM-2778?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

li yuntian updated BEAM-2778:
-----------------------------
    Description: 
options......
Pipeline pipeline = Pipeline.create(options);
KafkaIO.Read<String, String> read = KafkaIO.<String, String>read()
                .withBootstrapServers("10.139.7.xx:9092")
                .withTopics(Collections.singletonList("test"))
                .withKeyDeserializer(StringDeserializer.class)
                .withValueDeserializer(StringDeserializer.class);
PCollection<String> kafkaJsonPc = pipeline.apply(read.withoutMetadata())
       .apply(Window.<KV<String,String>>into(FixedWindows.of(Duration.standardMinutes(1))))
                .apply(Values.<String> create());
        kafkaJsonPc.apply(WithKeys.<String, String>of("global"))
                .apply(GroupByKey.<String, String>create());

I get errors, IF I DELETE " .apply(GroupByKey.<String, String>create())"   everything is fine.
SO I think is there something wrong with GroupBy Transform in spark streaming?
I find a jira https://issues.apache.org/jira/browse/BEAM-1624  is these the same? when to fix?

errors:
17/08/18 15:31:37 INFO BlockManagerInfo: Added broadcast_42_piece0 in memory on localhost:56153 (size: 399.0 B, free: 1804.1 MB)
17/08/18 15:31:37 INFO SparkContext: Created broadcast 42 from broadcast at GlobalWatermarkHolder.java:135
17/08/18 15:31:37 WARN JobGenerator: Timed out while stopping the job generator (timeout = 5000)
17/08/18 15:31:37 INFO JobGenerator: Waited for jobs to be processed and checkpoints to be written
17/08/18 15:31:47 INFO CheckpointWriter: CheckpointWriter executor terminated ? false, waited for 10001 ms.
17/08/18 15:31:47 WARN Client: interrupted waiting to send rpc request to server
java.lang.InterruptedException
	at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404)
	at java.util.concurrent.FutureTask.get(FutureTask.java:191)
	at org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:970)
	at org.apache.hadoop.ipc.Client.call(Client.java:1320)
	at org.apache.hadoop.ipc.Client.call(Client.java:1300)
	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
	at com.sun.proxy.$Proxy38.rename(Unknown Source)
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.rename(ClientNamenodeProtocolTranslatorPB.java:396)
	at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
	at com.sun.proxy.$Proxy41.rename(Unknown Source)
	at org.apache.hadoop.hdfs.DFSClient.rename(DFSClient.java:1555)
	at org.apache.hadoop.hdfs.DistributedFileSystem.rename(DistributedFileSystem.java:522)
	at org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:238)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
17/08/18 15:31:47 INFO JobGenerator: Stopped JobGenerator
17/08/18 15:31:47 WARN Client: interrupted waiting to send rpc request to server
java.lang.InterruptedException
	at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404)
	at java.util.concurrent.FutureTask.get(FutureTask.java:191)
	at org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:970)
	at org.apache.hadoop.ipc.Client.call(Client.java:1320)
	at org.apache.hadoop.ipc.Client.call(Client.java:1300)
	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
	at com.sun.proxy.$Proxy38.rename(Unknown Source)
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.rename(ClientNamenodeProtocolTranslatorPB.java:396)
	at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
	at com.sun.proxy.$Proxy41.rename(Unknown Source)
	at org.apache.hadoop.hdfs.DFSClient.rename(DFSClient.java:1555)
	at org.apache.hadoop.hdfs.DistributedFileSystem.rename(DistributedFileSystem.java:522)
	at org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:238)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
17/08/18 15:31:47 WARN Client: interrupted waiting to send rpc request to server
java.lang.InterruptedException
	at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404)
	at java.util.concurrent.FutureTask.get(FutureTask.java:191)
	at org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:970)
	at org.apache.hadoop.ipc.Client.call(Client.java:1320)
	at org.apache.hadoop.ipc.Client.call(Client.java:1300)
	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
	at com.sun.proxy.$Proxy38.rename(Unknown Source)
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.rename(ClientNamenodeProtocolTranslatorPB.java:396)
	at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
	at com.sun.proxy.$Proxy41.rename(Unknown Source)
	at org.apache.hadoop.hdfs.DFSClient.rename(DFSClient.java:1555)
	at org.apache.hadoop.hdfs.DistributedFileSystem.rename(DistributedFileSystem.java:522)
	at org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:238)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
17/08/18 15:31:47 WARN Client: interrupted waiting to send rpc request to server
java.lang.InterruptedException
	at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404)
	at java.util.concurrent.FutureTask.get(FutureTask.java:191)
	at org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:970)
	at org.apache.hadoop.ipc.Client.call(Client.java:1320)
	at org.apache.hadoop.ipc.Client.call(Client.java:1300)
	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
	at com.sun.proxy.$Proxy38.rename(Unknown Source)
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.rename(ClientNamenodeProtocolTranslatorPB.java:396)
	at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
	at com.sun.proxy.$Proxy41.rename(Unknown Source)
	at org.apache.hadoop.hdfs.DFSClient.rename(DFSClient.java:1555)
	at org.apache.hadoop.hdfs.DistributedFileSystem.rename(DistributedFileSystem.java:522)
	at org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:238)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
17/08/18 15:31:47 WARN Client: interrupted waiting to send rpc request to server
java.lang.InterruptedException
	at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404)
	at java.util.concurrent.FutureTask.get(FutureTask.java:191)
	at org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:970)
	at org.apache.hadoop.ipc.Client.call(Client.java:1320)
	at org.apache.hadoop.ipc.Client.call(Client.java:1300)
	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
	at com.sun.proxy.$Proxy38.rename(Unknown Source)
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.rename(ClientNamenodeProtocolTranslatorPB.java:396)
	at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
	at com.sun.proxy.$Proxy41.rename(Unknown Source)
	at org.apache.hadoop.hdfs.DFSClient.rename(DFSClient.java:1555)
	at org.apache.hadoop.hdfs.DistributedFileSystem.rename(DistributedFileSystem.java:522)
	at org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:238)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
17/08/18 15:31:47 WARN Client: interrupted waiting to send rpc request to server
java.lang.InterruptedException
	at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404)
	at java.util.concurrent.FutureTask.get(FutureTask.java:191)
	at org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:970)
	at org.apache.hadoop.ipc.Client.call(Client.java:1320)
	at org.apache.hadoop.ipc.Client.call(Client.java:1300)
	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
	at com.sun.proxy.$Proxy38.rename(Unknown Source)
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.rename(ClientNamenodeProtocolTranslatorPB.java:396)
	at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
	at com.sun.proxy.$Proxy41.rename(Unknown Source)
	at org.apache.hadoop.hdfs.DFSClient.rename(DFSClient.java:1555)
	at org.apache.hadoop.hdfs.DistributedFileSystem.rename(DistributedFileSystem.java:522)
	at org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:238)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
17/08/18 15:31:47 WARN Client: interrupted waiting to send rpc request to server
java.lang.InterruptedException
	at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404)
	at java.util.concurrent.FutureTask.get(FutureTask.java:191)
	at org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:970)
	at org.apache.hadoop.ipc.Client.call(Client.java:1320)
	at org.apache.hadoop.ipc.Client.call(Client.java:1300)
	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
	at com.sun.proxy.$Proxy38.rename(Unknown Source)
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.rename(ClientNamenodeProtocolTranslatorPB.java:396)
	at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
	at com.sun.proxy.$Proxy41.rename(Unknown Source)
	at org.apache.hadoop.hdfs.DFSClient.rename(DFSClient.java:1555)
	at org.apache.hadoop.hdfs.DistributedFileSystem.rename(DistributedFileSystem.java:522)
	at org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:238)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
17/08/18 15:31:47 WARN Client: interrupted waiting to send rpc request to server
java.lang.InterruptedException
	at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404)
	at java.util.concurrent.FutureTask.get(FutureTask.java:191)
	at org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:970)
	at org.apache.hadoop.ipc.Client.call(Client.java:1320)
	at org.apache.hadoop.ipc.Client.call(Client.java:1300)
	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
	at com.sun.proxy.$Proxy38.rename(Unknown Source)
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.rename(ClientNamenodeProtocolTranslatorPB.java:396)
	at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
	at com.sun.proxy.$Proxy41.rename(Unknown Source)
	at org.apache.hadoop.hdfs.DFSClient.rename(DFSClient.java:1555)
	at org.apache.hadoop.hdfs.DistributedFileSystem.rename(DistributedFileSystem.java:522)
	at org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:238)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
17/08/18 15:31:47 WARN Client: interrupted waiting to send rpc request to server
java.lang.InterruptedException
	at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404)
	at java.util.concurrent.FutureTask.get(FutureTask.java:191)
	at org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:970)
	at org.apache.hadoop.ipc.Client.call(Client.java:1320)
	at org.apache.hadoop.ipc.Client.call(Client.java:1300)
	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
	at com.sun.proxy.$Proxy38.rename(Unknown Source)
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.rename(ClientNamenodeProtocolTranslatorPB.java:396)
	at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
	at com.sun.proxy.$Proxy41.rename(Unknown Source)
	at org.apache.hadoop.hdfs.DFSClient.rename(DFSClient.java:1555)
	at org.apache.hadoop.hdfs.DistributedFileSystem.rename(DistributedFileSystem.java:522)
	at org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:238)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
17/08/18 15:31:47 WARN Client: interrupted waiting to send rpc request to server
java.lang.InterruptedException
	at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404)
	at java.util.concurrent.FutureTask.get(FutureTask.java:191)
	at org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:970)
	at org.apache.hadoop.ipc.Client.call(Client.java:1320)
	at org.apache.hadoop.ipc.Client.call(Client.java:1300)
	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
	at com.sun.proxy.$Proxy38.rename(Unknown Source)
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.rename(ClientNamenodeProtocolTranslatorPB.java:396)
	at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
	at com.sun.proxy.$Proxy41.rename(Unknown Source)
	at org.apache.hadoop.hdfs.DFSClient.rename(DFSClient.java:1555)
	at org.apache.hadoop.hdfs.DistributedFileSystem.rename(DistributedFileSystem.java:522)
	at org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:238)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
17/08/18 15:31:47 WARN Client: interrupted waiting to send rpc request to server
java.lang.InterruptedException
	at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404)
	at java.util.concurrent.FutureTask.get(FutureTask.java:191)
	at org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:970)
	at org.apache.hadoop.ipc.Client.call(Client.java:1320)
	at org.apache.hadoop.ipc.Client.call(Client.java:1300)
	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
	at com.sun.proxy.$Proxy38.rename(Unknown Source)
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.rename(ClientNamenodeProtocolTranslatorPB.java:396)
	at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
	at com.sun.proxy.$Proxy41.rename(Unknown Source)
	at org.apache.hadoop.hdfs.DFSClient.rename(DFSClient.java:1555)
	at org.apache.hadoop.hdfs.DistributedFileSystem.rename(DistributedFileSystem.java:522)
	at org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:238)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
17/08/18 15:31:47 WARN Client: interrupted waiting to send rpc request to server
java.lang.InterruptedException
	at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404)
	at java.util.concurrent.FutureTask.get(FutureTask.java:191)
	at org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:970)
	at org.apache.hadoop.ipc.Client.call(Client.java:1320)
	at org.apache.hadoop.ipc.Client.call(Client.java:1300)
	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
	at com.sun.proxy.$Proxy38.rename(Unknown Source)
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.rename(ClientNamenodeProtocolTranslatorPB.java:396)
	at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
	at com.sun.proxy.$Proxy41.rename(Unknown Source)
	at org.apache.hadoop.hdfs.DFSClient.rename(DFSClient.java:1555)
	at org.apache.hadoop.hdfs.DistributedFileSystem.rename(DistributedFileSystem.java:522)
	at org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:238)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
17/08/18 15:31:47 WARN Client: interrupted waiting to send rpc request to server
java.lang.InterruptedException
	at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404)
	at java.util.concurrent.FutureTask.get(FutureTask.java:191)
	at org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:970)
	at org.apache.hadoop.ipc.Client.call(Client.java:1320)
	at org.apache.hadoop.ipc.Client.call(Client.java:1300)
	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
	at com.sun.proxy.$Proxy38.rename(Unknown Source)
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.rename(ClientNamenodeProtocolTranslatorPB.java:396)
	at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
	at com.sun.proxy.$Proxy41.rename(Unknown Source)
	at org.apache.hadoop.hdfs.DFSClient.rename(DFSClient.java:1555)
	at org.apache.hadoop.hdfs.DistributedFileSystem.rename(DistributedFileSystem.java:522)
	at org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:238)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
17/08/18 15:31:47 WARN Client: interrupted waiting to send rpc request to server
java.lang.InterruptedException
	at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404)
	at java.util.concurrent.FutureTask.get(FutureTask.java:191)
	at org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:970)
	at org.apache.hadoop.ipc.Client.call(Client.java:1320)
	at org.apache.hadoop.ipc.Client.call(Client.java:1300)
	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
	at com.sun.proxy.$Proxy38.rename(Unknown Source)
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.rename(ClientNamenodeProtocolTranslatorPB.java:396)
	at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
	at com.sun.proxy.$Proxy41.rename(Unknown Source)
	at org.apache.hadoop.hdfs.DFSClient.rename(DFSClient.java:1555)
	at org.apache.hadoop.hdfs.DistributedFileSystem.rename(DistributedFileSystem.java:522)
	at org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:238)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
17/08/18 15:31:47 WARN Client: interrupted waiting to send rpc request to server
java.lang.InterruptedException
	at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404)
	at java.util.concurrent.FutureTask.get(FutureTask.java:191)
	at org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:970)
	at org.apache.hadoop.ipc.Client.call(Client.java:1320)
	at org.apache.hadoop.ipc.Client.call(Client.java:1300)
	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
	at com.sun.proxy.$Proxy38.rename(Unknown Source)
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.rename(ClientNamenodeProtocolTranslatorPB.java:396)
	at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
	at com.sun.proxy.$Proxy41.rename(Unknown Source)
	at org.apache.hadoop.hdfs.DFSClient.rename(DFSClient.java:1555)
	at org.apache.hadoop.hdfs.DistributedFileSystem.rename(DistributedFileSystem.java:522)
	at org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:238)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
17/08/18 15:31:47 WARN Client: interrupted waiting to send rpc request to server
java.lang.InterruptedException
	at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404)
	at java.util.concurrent.FutureTask.get(FutureTask.java:191)
	at org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:970)
	at org.apache.hadoop.ipc.Client.call(Client.java:1320)
	at org.apache.hadoop.ipc.Client.call(Client.java:1300)
	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
	at com.sun.proxy.$Proxy38.rename(Unknown Source)
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.rename(ClientNamenodeProtocolTranslatorPB.java:396)
	at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
	at com.sun.proxy.$Proxy41.rename(Unknown Source)
	at org.apache.hadoop.hdfs.DFSClient.rename(DFSClient.java:1555)
	at org.apache.hadoop.hdfs.DistributedFileSystem.rename(DistributedFileSystem.java:522)
	at org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:238)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
17/08/18 15:31:47 WARN RetryInvocationHandler: Exception while invoking class org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.rename. Not retrying because failovers (15) exceeded maximum allowed (15)
java.io.IOException: java.lang.InterruptedException
	at org.apache.hadoop.ipc.Client.call(Client.java:1326)
	at org.apache.hadoop.ipc.Client.call(Client.java:1300)
	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
	at com.sun.proxy.$Proxy38.rename(Unknown Source)
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.rename(ClientNamenodeProtocolTranslatorPB.java:396)
	at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
	at com.sun.proxy.$Proxy41.rename(Unknown Source)
	at org.apache.hadoop.hdfs.DFSClient.rename(DFSClient.java:1555)
	at org.apache.hadoop.hdfs.DistributedFileSystem.rename(DistributedFileSystem.java:522)
	at org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:238)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.InterruptedException
	at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404)
	at java.util.concurrent.FutureTask.get(FutureTask.java:191)
	at org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:970)
	at org.apache.hadoop.ipc.Client.call(Client.java:1320)
	... 16 more
17/08/18 15:31:47 WARN CheckpointWriter: Error in attempt 1 of writing checkpoint to hdfs://bchcluster/tmp/Beam Job Spark0.58879054/spark-checkpoint/checkpoint-1503041490500
java.io.IOException: java.lang.InterruptedException
	at org.apache.hadoop.ipc.Client.call(Client.java:1326)
	at org.apache.hadoop.ipc.Client.call(Client.java:1300)
	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
	at com.sun.proxy.$Proxy38.rename(Unknown Source)
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.rename(ClientNamenodeProtocolTranslatorPB.java:396)
	at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
	at com.sun.proxy.$Proxy41.rename(Unknown Source)
	at org.apache.hadoop.hdfs.DFSClient.rename(DFSClient.java:1555)
	at org.apache.hadoop.hdfs.DistributedFileSystem.rename(DistributedFileSystem.java:522)
	at org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:238)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.InterruptedException
	at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404)
	at java.util.concurrent.FutureTask.get(FutureTask.java:191)
	at org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:970)
	at org.apache.hadoop.ipc.Client.call(Client.java:1320)
	... 16 more
17/08/18 15:31:47 WARN CheckpointWriter: Could not write checkpoint for time 1503041488000 ms to file hdfs://bchcluster/tmp/Beam Job Spark0.58879054/spark-checkpoint/checkpoint-1503041490500'
17/08/18 15:31:47 INFO JobScheduler: Stopped JobScheduler
17/08/18 15:31:47 INFO StreamingContext: StreamingContext stopped successfully
Exception in thread "main" java.lang.RuntimeException: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0 in stage 93.0 (TID 30) had a not serializable result: org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet$StateAndTimers
Serialization stack:
	- object not serializable (class: org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet$StateAndTimers, value: org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet$StateAndTimers@14b2cd55)
	- field (class: scala.Tuple2, name: _1, type: class java.lang.Object)
	- object (class scala.Tuple2, (org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet$StateAndTimers@14b2cd55,[]))
	- field (class: scala.Tuple2, name: _2, type: class java.lang.Object)
	- object (class scala.Tuple2, (org.apache.beam.runners.spark.util.ByteArray@eacf3ee4,(org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet$StateAndTimers@14b2cd55,[])))
	at org.apache.beam.runners.spark.SparkPipelineResult.runtimeExceptionFrom(SparkPipelineResult.java:55)
	at org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:72)
	at org.apache.beam.runners.spark.SparkPipelineResult.access$000(SparkPipelineResult.java:41)
	at org.apache.beam.runners.spark.SparkPipelineResult$StreamingMode.stop(SparkPipelineResult.java:163)
	at org.apache.beam.runners.spark.SparkPipelineResult.offerNewState(SparkPipelineResult.java:198)
	at org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:101)
	at org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:87)
	at com.chinamobile.cmss.example.TwoCountBaseApp.main(TwoCountBaseApp.java:83)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0 in stage 93.0 (TID 30) had a not serializable result: org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet$StateAndTimers
Serialization stack:
	- object not serializable (class: org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet$StateAndTimers, value: org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet$StateAndTimers@14b2cd55)
	- field (class: scala.Tuple2, name: _1, type: class java.lang.Object)
	- object (class scala.Tuple2, (org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet$StateAndTimers@14b2cd55,[]))
	- field (class: scala.Tuple2, name: _2, type: class java.lang.Object)
	- object (class scala.Tuple2, (org.apache.beam.runners.spark.util.ByteArray@eacf3ee4,(org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet$StateAndTimers@14b2cd55,[])))
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
	at scala.Option.foreach(Option.scala:236)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929)
	at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:912)
	at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:910)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
	at org.apache.spark.rdd.RDD.foreach(RDD.scala:910)
	at org.apache.spark.api.java.JavaRDDLike$class.foreach(JavaRDDLike.scala:332)
	at org.apache.spark.api.java.AbstractJavaRDDLike.foreach(JavaRDDLike.scala:46)
	at org.apache.beam.runners.spark.translation.streaming.UnboundedDataset$1.call(UnboundedDataset.java:77)
	at org.apache.beam.runners.spark.translation.streaming.UnboundedDataset$1.call(UnboundedDataset.java:74)
	at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$3.apply(JavaDStreamLike.scala:335)
	at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$3.apply(JavaDStreamLike.scala:335)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50)
	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
	at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49)
	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
	at scala.util.Try$.apply(Try.scala:161)
	at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224)
	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745) 


  was:
options......
Pipeline pipeline = Pipeline.create(options);
KafkaIO.Read<String, String> read = KafkaIO.<String, String>read()
                .withBootstrapServers("10.139.7.xx:9092")
                .withTopics(Collections.singletonList("test"))
                .withKeyDeserializer(StringDeserializer.class)
                .withValueDeserializer(StringDeserializer.class);
PCollection<String> kafkaJsonPc = pipeline.apply(read.withoutMetadata())
       .apply(Window.<KV<String,String>>into(FixedWindows.of(Duration.standardMinutes(1))))
                .apply(Values.<String> create());
        kafkaJsonPc.apply(WithKeys.<String, String>of("global"))
                .apply(GroupByKey.<String, String>create());

I get error:
17/08/18 15:31:37 INFO BlockManagerInfo: Added broadcast_42_piece0 in memory on localhost:56153 (size: 399.0 B, free: 1804.1 MB)
17/08/18 15:31:37 INFO SparkContext: Created broadcast 42 from broadcast at GlobalWatermarkHolder.java:135
17/08/18 15:31:37 WARN JobGenerator: Timed out while stopping the job generator (timeout = 5000)
17/08/18 15:31:37 INFO JobGenerator: Waited for jobs to be processed and checkpoints to be written
17/08/18 15:31:47 INFO CheckpointWriter: CheckpointWriter executor terminated ? false, waited for 10001 ms.
17/08/18 15:31:47 WARN Client: interrupted waiting to send rpc request to server
java.lang.InterruptedException
	at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404)
	at java.util.concurrent.FutureTask.get(FutureTask.java:191)
	at org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:970)
	at org.apache.hadoop.ipc.Client.call(Client.java:1320)
	at org.apache.hadoop.ipc.Client.call(Client.java:1300)
	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
	at com.sun.proxy.$Proxy38.rename(Unknown Source)
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.rename(ClientNamenodeProtocolTranslatorPB.java:396)
	at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
	at com.sun.proxy.$Proxy41.rename(Unknown Source)
	at org.apache.hadoop.hdfs.DFSClient.rename(DFSClient.java:1555)
	at org.apache.hadoop.hdfs.DistributedFileSystem.rename(DistributedFileSystem.java:522)
	at org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:238)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
17/08/18 15:31:47 INFO JobGenerator: Stopped JobGenerator
17/08/18 15:31:47 WARN Client: interrupted waiting to send rpc request to server
java.lang.InterruptedException
	at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404)
	at java.util.concurrent.FutureTask.get(FutureTask.java:191)
	at org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:970)
	at org.apache.hadoop.ipc.Client.call(Client.java:1320)
	at org.apache.hadoop.ipc.Client.call(Client.java:1300)
	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
	at com.sun.proxy.$Proxy38.rename(Unknown Source)
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.rename(ClientNamenodeProtocolTranslatorPB.java:396)
	at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
	at com.sun.proxy.$Proxy41.rename(Unknown Source)
	at org.apache.hadoop.hdfs.DFSClient.rename(DFSClient.java:1555)
	at org.apache.hadoop.hdfs.DistributedFileSystem.rename(DistributedFileSystem.java:522)
	at org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:238)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
17/08/18 15:31:47 WARN Client: interrupted waiting to send rpc request to server
java.lang.InterruptedException
	at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404)
	at java.util.concurrent.FutureTask.get(FutureTask.java:191)
	at org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:970)
	at org.apache.hadoop.ipc.Client.call(Client.java:1320)
	at org.apache.hadoop.ipc.Client.call(Client.java:1300)
	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
	at com.sun.proxy.$Proxy38.rename(Unknown Source)
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.rename(ClientNamenodeProtocolTranslatorPB.java:396)
	at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
	at com.sun.proxy.$Proxy41.rename(Unknown Source)
	at org.apache.hadoop.hdfs.DFSClient.rename(DFSClient.java:1555)
	at org.apache.hadoop.hdfs.DistributedFileSystem.rename(DistributedFileSystem.java:522)
	at org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:238)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
17/08/18 15:31:47 WARN Client: interrupted waiting to send rpc request to server
java.lang.InterruptedException
	at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404)
	at java.util.concurrent.FutureTask.get(FutureTask.java:191)
	at org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:970)
	at org.apache.hadoop.ipc.Client.call(Client.java:1320)
	at org.apache.hadoop.ipc.Client.call(Client.java:1300)
	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
	at com.sun.proxy.$Proxy38.rename(Unknown Source)
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.rename(ClientNamenodeProtocolTranslatorPB.java:396)
	at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
	at com.sun.proxy.$Proxy41.rename(Unknown Source)
	at org.apache.hadoop.hdfs.DFSClient.rename(DFSClient.java:1555)
	at org.apache.hadoop.hdfs.DistributedFileSystem.rename(DistributedFileSystem.java:522)
	at org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:238)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
17/08/18 15:31:47 WARN Client: interrupted waiting to send rpc request to server
java.lang.InterruptedException
	at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404)
	at java.util.concurrent.FutureTask.get(FutureTask.java:191)
	at org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:970)
	at org.apache.hadoop.ipc.Client.call(Client.java:1320)
	at org.apache.hadoop.ipc.Client.call(Client.java:1300)
	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
	at com.sun.proxy.$Proxy38.rename(Unknown Source)
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.rename(ClientNamenodeProtocolTranslatorPB.java:396)
	at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
	at com.sun.proxy.$Proxy41.rename(Unknown Source)
	at org.apache.hadoop.hdfs.DFSClient.rename(DFSClient.java:1555)
	at org.apache.hadoop.hdfs.DistributedFileSystem.rename(DistributedFileSystem.java:522)
	at org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:238)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
17/08/18 15:31:47 WARN Client: interrupted waiting to send rpc request to server
java.lang.InterruptedException
	at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404)
	at java.util.concurrent.FutureTask.get(FutureTask.java:191)
	at org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:970)
	at org.apache.hadoop.ipc.Client.call(Client.java:1320)
	at org.apache.hadoop.ipc.Client.call(Client.java:1300)
	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
	at com.sun.proxy.$Proxy38.rename(Unknown Source)
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.rename(ClientNamenodeProtocolTranslatorPB.java:396)
	at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
	at com.sun.proxy.$Proxy41.rename(Unknown Source)
	at org.apache.hadoop.hdfs.DFSClient.rename(DFSClient.java:1555)
	at org.apache.hadoop.hdfs.DistributedFileSystem.rename(DistributedFileSystem.java:522)
	at org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:238)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
17/08/18 15:31:47 WARN Client: interrupted waiting to send rpc request to server
java.lang.InterruptedException
	at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404)
	at java.util.concurrent.FutureTask.get(FutureTask.java:191)
	at org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:970)
	at org.apache.hadoop.ipc.Client.call(Client.java:1320)
	at org.apache.hadoop.ipc.Client.call(Client.java:1300)
	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
	at com.sun.proxy.$Proxy38.rename(Unknown Source)
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.rename(ClientNamenodeProtocolTranslatorPB.java:396)
	at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
	at com.sun.proxy.$Proxy41.rename(Unknown Source)
	at org.apache.hadoop.hdfs.DFSClient.rename(DFSClient.java:1555)
	at org.apache.hadoop.hdfs.DistributedFileSystem.rename(DistributedFileSystem.java:522)
	at org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:238)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
17/08/18 15:31:47 WARN Client: interrupted waiting to send rpc request to server
java.lang.InterruptedException
	at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404)
	at java.util.concurrent.FutureTask.get(FutureTask.java:191)
	at org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:970)
	at org.apache.hadoop.ipc.Client.call(Client.java:1320)
	at org.apache.hadoop.ipc.Client.call(Client.java:1300)
	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
	at com.sun.proxy.$Proxy38.rename(Unknown Source)
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.rename(ClientNamenodeProtocolTranslatorPB.java:396)
	at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
	at com.sun.proxy.$Proxy41.rename(Unknown Source)
	at org.apache.hadoop.hdfs.DFSClient.rename(DFSClient.java:1555)
	at org.apache.hadoop.hdfs.DistributedFileSystem.rename(DistributedFileSystem.java:522)
	at org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:238)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
17/08/18 15:31:47 WARN Client: interrupted waiting to send rpc request to server
java.lang.InterruptedException
	at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404)
	at java.util.concurrent.FutureTask.get(FutureTask.java:191)
	at org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:970)
	at org.apache.hadoop.ipc.Client.call(Client.java:1320)
	at org.apache.hadoop.ipc.Client.call(Client.java:1300)
	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
	at com.sun.proxy.$Proxy38.rename(Unknown Source)
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.rename(ClientNamenodeProtocolTranslatorPB.java:396)
	at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
	at com.sun.proxy.$Proxy41.rename(Unknown Source)
	at org.apache.hadoop.hdfs.DFSClient.rename(DFSClient.java:1555)
	at org.apache.hadoop.hdfs.DistributedFileSystem.rename(DistributedFileSystem.java:522)
	at org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:238)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
17/08/18 15:31:47 WARN Client: interrupted waiting to send rpc request to server
java.lang.InterruptedException
	at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404)
	at java.util.concurrent.FutureTask.get(FutureTask.java:191)
	at org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:970)
	at org.apache.hadoop.ipc.Client.call(Client.java:1320)
	at org.apache.hadoop.ipc.Client.call(Client.java:1300)
	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
	at com.sun.proxy.$Proxy38.rename(Unknown Source)
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.rename(ClientNamenodeProtocolTranslatorPB.java:396)
	at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
	at com.sun.proxy.$Proxy41.rename(Unknown Source)
	at org.apache.hadoop.hdfs.DFSClient.rename(DFSClient.java:1555)
	at org.apache.hadoop.hdfs.DistributedFileSystem.rename(DistributedFileSystem.java:522)
	at org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:238)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
17/08/18 15:31:47 WARN Client: interrupted waiting to send rpc request to server
java.lang.InterruptedException
	at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404)
	at java.util.concurrent.FutureTask.get(FutureTask.java:191)
	at org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:970)
	at org.apache.hadoop.ipc.Client.call(Client.java:1320)
	at org.apache.hadoop.ipc.Client.call(Client.java:1300)
	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
	at com.sun.proxy.$Proxy38.rename(Unknown Source)
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.rename(ClientNamenodeProtocolTranslatorPB.java:396)
	at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
	at com.sun.proxy.$Proxy41.rename(Unknown Source)
	at org.apache.hadoop.hdfs.DFSClient.rename(DFSClient.java:1555)
	at org.apache.hadoop.hdfs.DistributedFileSystem.rename(DistributedFileSystem.java:522)
	at org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:238)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
17/08/18 15:31:47 WARN Client: interrupted waiting to send rpc request to server
java.lang.InterruptedException
	at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404)
	at java.util.concurrent.FutureTask.get(FutureTask.java:191)
	at org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:970)
	at org.apache.hadoop.ipc.Client.call(Client.java:1320)
	at org.apache.hadoop.ipc.Client.call(Client.java:1300)
	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
	at com.sun.proxy.$Proxy38.rename(Unknown Source)
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.rename(ClientNamenodeProtocolTranslatorPB.java:396)
	at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
	at com.sun.proxy.$Proxy41.rename(Unknown Source)
	at org.apache.hadoop.hdfs.DFSClient.rename(DFSClient.java:1555)
	at org.apache.hadoop.hdfs.DistributedFileSystem.rename(DistributedFileSystem.java:522)
	at org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:238)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
17/08/18 15:31:47 WARN Client: interrupted waiting to send rpc request to server
java.lang.InterruptedException
	at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404)
	at java.util.concurrent.FutureTask.get(FutureTask.java:191)
	at org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:970)
	at org.apache.hadoop.ipc.Client.call(Client.java:1320)
	at org.apache.hadoop.ipc.Client.call(Client.java:1300)
	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
	at com.sun.proxy.$Proxy38.rename(Unknown Source)
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.rename(ClientNamenodeProtocolTranslatorPB.java:396)
	at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
	at com.sun.proxy.$Proxy41.rename(Unknown Source)
	at org.apache.hadoop.hdfs.DFSClient.rename(DFSClient.java:1555)
	at org.apache.hadoop.hdfs.DistributedFileSystem.rename(DistributedFileSystem.java:522)
	at org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:238)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
17/08/18 15:31:47 WARN Client: interrupted waiting to send rpc request to server
java.lang.InterruptedException
	at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404)
	at java.util.concurrent.FutureTask.get(FutureTask.java:191)
	at org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:970)
	at org.apache.hadoop.ipc.Client.call(Client.java:1320)
	at org.apache.hadoop.ipc.Client.call(Client.java:1300)
	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
	at com.sun.proxy.$Proxy38.rename(Unknown Source)
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.rename(ClientNamenodeProtocolTranslatorPB.java:396)
	at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
	at com.sun.proxy.$Proxy41.rename(Unknown Source)
	at org.apache.hadoop.hdfs.DFSClient.rename(DFSClient.java:1555)
	at org.apache.hadoop.hdfs.DistributedFileSystem.rename(DistributedFileSystem.java:522)
	at org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:238)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
17/08/18 15:31:47 WARN Client: interrupted waiting to send rpc request to server
java.lang.InterruptedException
	at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404)
	at java.util.concurrent.FutureTask.get(FutureTask.java:191)
	at org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:970)
	at org.apache.hadoop.ipc.Client.call(Client.java:1320)
	at org.apache.hadoop.ipc.Client.call(Client.java:1300)
	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
	at com.sun.proxy.$Proxy38.rename(Unknown Source)
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.rename(ClientNamenodeProtocolTranslatorPB.java:396)
	at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
	at com.sun.proxy.$Proxy41.rename(Unknown Source)
	at org.apache.hadoop.hdfs.DFSClient.rename(DFSClient.java:1555)
	at org.apache.hadoop.hdfs.DistributedFileSystem.rename(DistributedFileSystem.java:522)
	at org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:238)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
17/08/18 15:31:47 WARN Client: interrupted waiting to send rpc request to server
java.lang.InterruptedException
	at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404)
	at java.util.concurrent.FutureTask.get(FutureTask.java:191)
	at org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:970)
	at org.apache.hadoop.ipc.Client.call(Client.java:1320)
	at org.apache.hadoop.ipc.Client.call(Client.java:1300)
	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
	at com.sun.proxy.$Proxy38.rename(Unknown Source)
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.rename(ClientNamenodeProtocolTranslatorPB.java:396)
	at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
	at com.sun.proxy.$Proxy41.rename(Unknown Source)
	at org.apache.hadoop.hdfs.DFSClient.rename(DFSClient.java:1555)
	at org.apache.hadoop.hdfs.DistributedFileSystem.rename(DistributedFileSystem.java:522)
	at org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:238)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
17/08/18 15:31:47 WARN RetryInvocationHandler: Exception while invoking class org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.rename. Not retrying because failovers (15) exceeded maximum allowed (15)
java.io.IOException: java.lang.InterruptedException
	at org.apache.hadoop.ipc.Client.call(Client.java:1326)
	at org.apache.hadoop.ipc.Client.call(Client.java:1300)
	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
	at com.sun.proxy.$Proxy38.rename(Unknown Source)
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.rename(ClientNamenodeProtocolTranslatorPB.java:396)
	at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
	at com.sun.proxy.$Proxy41.rename(Unknown Source)
	at org.apache.hadoop.hdfs.DFSClient.rename(DFSClient.java:1555)
	at org.apache.hadoop.hdfs.DistributedFileSystem.rename(DistributedFileSystem.java:522)
	at org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:238)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.InterruptedException
	at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404)
	at java.util.concurrent.FutureTask.get(FutureTask.java:191)
	at org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:970)
	at org.apache.hadoop.ipc.Client.call(Client.java:1320)
	... 16 more
17/08/18 15:31:47 WARN CheckpointWriter: Error in attempt 1 of writing checkpoint to hdfs://bchcluster/tmp/Beam Job Spark0.58879054/spark-checkpoint/checkpoint-1503041490500
java.io.IOException: java.lang.InterruptedException
	at org.apache.hadoop.ipc.Client.call(Client.java:1326)
	at org.apache.hadoop.ipc.Client.call(Client.java:1300)
	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
	at com.sun.proxy.$Proxy38.rename(Unknown Source)
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.rename(ClientNamenodeProtocolTranslatorPB.java:396)
	at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
	at com.sun.proxy.$Proxy41.rename(Unknown Source)
	at org.apache.hadoop.hdfs.DFSClient.rename(DFSClient.java:1555)
	at org.apache.hadoop.hdfs.DistributedFileSystem.rename(DistributedFileSystem.java:522)
	at org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:238)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.InterruptedException
	at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404)
	at java.util.concurrent.FutureTask.get(FutureTask.java:191)
	at org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:970)
	at org.apache.hadoop.ipc.Client.call(Client.java:1320)
	... 16 more
17/08/18 15:31:47 WARN CheckpointWriter: Could not write checkpoint for time 1503041488000 ms to file hdfs://bchcluster/tmp/Beam Job Spark0.58879054/spark-checkpoint/checkpoint-1503041490500'
17/08/18 15:31:47 INFO JobScheduler: Stopped JobScheduler
17/08/18 15:31:47 INFO StreamingContext: StreamingContext stopped successfully
Exception in thread "main" java.lang.RuntimeException: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0 in stage 93.0 (TID 30) had a not serializable result: org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet$StateAndTimers
Serialization stack:
	- object not serializable (class: org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet$StateAndTimers, value: org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet$StateAndTimers@14b2cd55)
	- field (class: scala.Tuple2, name: _1, type: class java.lang.Object)
	- object (class scala.Tuple2, (org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet$StateAndTimers@14b2cd55,[]))
	- field (class: scala.Tuple2, name: _2, type: class java.lang.Object)
	- object (class scala.Tuple2, (org.apache.beam.runners.spark.util.ByteArray@eacf3ee4,(org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet$StateAndTimers@14b2cd55,[])))
	at org.apache.beam.runners.spark.SparkPipelineResult.runtimeExceptionFrom(SparkPipelineResult.java:55)
	at org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:72)
	at org.apache.beam.runners.spark.SparkPipelineResult.access$000(SparkPipelineResult.java:41)
	at org.apache.beam.runners.spark.SparkPipelineResult$StreamingMode.stop(SparkPipelineResult.java:163)
	at org.apache.beam.runners.spark.SparkPipelineResult.offerNewState(SparkPipelineResult.java:198)
	at org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:101)
	at org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:87)
	at com.chinamobile.cmss.example.TwoCountBaseApp.main(TwoCountBaseApp.java:83)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0 in stage 93.0 (TID 30) had a not serializable result: org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet$StateAndTimers
Serialization stack:
	- object not serializable (class: org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet$StateAndTimers, value: org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet$StateAndTimers@14b2cd55)
	- field (class: scala.Tuple2, name: _1, type: class java.lang.Object)
	- object (class scala.Tuple2, (org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet$StateAndTimers@14b2cd55,[]))
	- field (class: scala.Tuple2, name: _2, type: class java.lang.Object)
	- object (class scala.Tuple2, (org.apache.beam.runners.spark.util.ByteArray@eacf3ee4,(org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet$StateAndTimers@14b2cd55,[])))
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
	at scala.Option.foreach(Option.scala:236)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929)
	at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:912)
	at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:910)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
	at org.apache.spark.rdd.RDD.foreach(RDD.scala:910)
	at org.apache.spark.api.java.JavaRDDLike$class.foreach(JavaRDDLike.scala:332)
	at org.apache.spark.api.java.AbstractJavaRDDLike.foreach(JavaRDDLike.scala:46)
	at org.apache.beam.runners.spark.translation.streaming.UnboundedDataset$1.call(UnboundedDataset.java:77)
	at org.apache.beam.runners.spark.translation.streaming.UnboundedDataset$1.call(UnboundedDataset.java:74)
	at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$3.apply(JavaDStreamLike.scala:335)
	at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$3.apply(JavaDStreamLike.scala:335)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50)
	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
	at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49)
	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
	at scala.util.Try$.apply(Try.scala:161)
	at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224)
	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745) 
IF I DELETE .apply(GroupByKey.<String, String>create())   everything is fine.
SO I think is there something wrong with GroupBy Transform in spark streaming?
I find a jira https://issues.apache.org/jira/browse/BEAM-1624  is these the same? when to fix?


> Serialization stack error  using spark stream 
> ----------------------------------------------
>
>                 Key: BEAM-2778
>                 URL: https://issues.apache.org/jira/browse/BEAM-2778
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-spark
>    Affects Versions: 2.0.0
>            Reporter: li yuntian
>            Assignee: Amit Sela
>
> options......
> Pipeline pipeline = Pipeline.create(options);
> KafkaIO.Read<String, String> read = KafkaIO.<String, String>read()
>                 .withBootstrapServers("10.139.7.xx:9092")
>                 .withTopics(Collections.singletonList("test"))
>                 .withKeyDeserializer(StringDeserializer.class)
>                 .withValueDeserializer(StringDeserializer.class);
> PCollection<String> kafkaJsonPc = pipeline.apply(read.withoutMetadata())
>        .apply(Window.<KV<String,String>>into(FixedWindows.of(Duration.standardMinutes(1))))
>                 .apply(Values.<String> create());
>         kafkaJsonPc.apply(WithKeys.<String, String>of("global"))
>                 .apply(GroupByKey.<String, String>create());
> I get errors, IF I DELETE " .apply(GroupByKey.<String, String>create())"   everything is fine.
> SO I think is there something wrong with GroupBy Transform in spark streaming?
> I find a jira https://issues.apache.org/jira/browse/BEAM-1624  is these the same? when to fix?
> errors:
> 17/08/18 15:31:37 INFO BlockManagerInfo: Added broadcast_42_piece0 in memory on localhost:56153 (size: 399.0 B, free: 1804.1 MB)
> 17/08/18 15:31:37 INFO SparkContext: Created broadcast 42 from broadcast at GlobalWatermarkHolder.java:135
> 17/08/18 15:31:37 WARN JobGenerator: Timed out while stopping the job generator (timeout = 5000)
> 17/08/18 15:31:37 INFO JobGenerator: Waited for jobs to be processed and checkpoints to be written
> 17/08/18 15:31:47 INFO CheckpointWriter: CheckpointWriter executor terminated ? false, waited for 10001 ms.
> 17/08/18 15:31:47 WARN Client: interrupted waiting to send rpc request to server
> java.lang.InterruptedException
> 	at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404)
> 	at java.util.concurrent.FutureTask.get(FutureTask.java:191)
> 	at org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:970)
> 	at org.apache.hadoop.ipc.Client.call(Client.java:1320)
> 	at org.apache.hadoop.ipc.Client.call(Client.java:1300)
> 	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
> 	at com.sun.proxy.$Proxy38.rename(Unknown Source)
> 	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.rename(ClientNamenodeProtocolTranslatorPB.java:396)
> 	at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:497)
> 	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
> 	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
> 	at com.sun.proxy.$Proxy41.rename(Unknown Source)
> 	at org.apache.hadoop.hdfs.DFSClient.rename(DFSClient.java:1555)
> 	at org.apache.hadoop.hdfs.DistributedFileSystem.rename(DistributedFileSystem.java:522)
> 	at org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:238)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> 	at java.lang.Thread.run(Thread.java:745)
> 17/08/18 15:31:47 INFO JobGenerator: Stopped JobGenerator
> 17/08/18 15:31:47 WARN Client: interrupted waiting to send rpc request to server
> java.lang.InterruptedException
> 	at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404)
> 	at java.util.concurrent.FutureTask.get(FutureTask.java:191)
> 	at org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:970)
> 	at org.apache.hadoop.ipc.Client.call(Client.java:1320)
> 	at org.apache.hadoop.ipc.Client.call(Client.java:1300)
> 	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
> 	at com.sun.proxy.$Proxy38.rename(Unknown Source)
> 	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.rename(ClientNamenodeProtocolTranslatorPB.java:396)
> 	at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:497)
> 	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
> 	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
> 	at com.sun.proxy.$Proxy41.rename(Unknown Source)
> 	at org.apache.hadoop.hdfs.DFSClient.rename(DFSClient.java:1555)
> 	at org.apache.hadoop.hdfs.DistributedFileSystem.rename(DistributedFileSystem.java:522)
> 	at org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:238)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> 	at java.lang.Thread.run(Thread.java:745)
> 17/08/18 15:31:47 WARN Client: interrupted waiting to send rpc request to server
> java.lang.InterruptedException
> 	at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404)
> 	at java.util.concurrent.FutureTask.get(FutureTask.java:191)
> 	at org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:970)
> 	at org.apache.hadoop.ipc.Client.call(Client.java:1320)
> 	at org.apache.hadoop.ipc.Client.call(Client.java:1300)
> 	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
> 	at com.sun.proxy.$Proxy38.rename(Unknown Source)
> 	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.rename(ClientNamenodeProtocolTranslatorPB.java:396)
> 	at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:497)
> 	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
> 	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
> 	at com.sun.proxy.$Proxy41.rename(Unknown Source)
> 	at org.apache.hadoop.hdfs.DFSClient.rename(DFSClient.java:1555)
> 	at org.apache.hadoop.hdfs.DistributedFileSystem.rename(DistributedFileSystem.java:522)
> 	at org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:238)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> 	at java.lang.Thread.run(Thread.java:745)
> 17/08/18 15:31:47 WARN Client: interrupted waiting to send rpc request to server
> java.lang.InterruptedException
> 	at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404)
> 	at java.util.concurrent.FutureTask.get(FutureTask.java:191)
> 	at org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:970)
> 	at org.apache.hadoop.ipc.Client.call(Client.java:1320)
> 	at org.apache.hadoop.ipc.Client.call(Client.java:1300)
> 	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
> 	at com.sun.proxy.$Proxy38.rename(Unknown Source)
> 	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.rename(ClientNamenodeProtocolTranslatorPB.java:396)
> 	at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:497)
> 	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
> 	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
> 	at com.sun.proxy.$Proxy41.rename(Unknown Source)
> 	at org.apache.hadoop.hdfs.DFSClient.rename(DFSClient.java:1555)
> 	at org.apache.hadoop.hdfs.DistributedFileSystem.rename(DistributedFileSystem.java:522)
> 	at org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:238)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> 	at java.lang.Thread.run(Thread.java:745)
> 17/08/18 15:31:47 WARN Client: interrupted waiting to send rpc request to server
> java.lang.InterruptedException
> 	at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404)
> 	at java.util.concurrent.FutureTask.get(FutureTask.java:191)
> 	at org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:970)
> 	at org.apache.hadoop.ipc.Client.call(Client.java:1320)
> 	at org.apache.hadoop.ipc.Client.call(Client.java:1300)
> 	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
> 	at com.sun.proxy.$Proxy38.rename(Unknown Source)
> 	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.rename(ClientNamenodeProtocolTranslatorPB.java:396)
> 	at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:497)
> 	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
> 	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
> 	at com.sun.proxy.$Proxy41.rename(Unknown Source)
> 	at org.apache.hadoop.hdfs.DFSClient.rename(DFSClient.java:1555)
> 	at org.apache.hadoop.hdfs.DistributedFileSystem.rename(DistributedFileSystem.java:522)
> 	at org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:238)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> 	at java.lang.Thread.run(Thread.java:745)
> 17/08/18 15:31:47 WARN Client: interrupted waiting to send rpc request to server
> java.lang.InterruptedException
> 	at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404)
> 	at java.util.concurrent.FutureTask.get(FutureTask.java:191)
> 	at org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:970)
> 	at org.apache.hadoop.ipc.Client.call(Client.java:1320)
> 	at org.apache.hadoop.ipc.Client.call(Client.java:1300)
> 	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
> 	at com.sun.proxy.$Proxy38.rename(Unknown Source)
> 	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.rename(ClientNamenodeProtocolTranslatorPB.java:396)
> 	at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:497)
> 	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
> 	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
> 	at com.sun.proxy.$Proxy41.rename(Unknown Source)
> 	at org.apache.hadoop.hdfs.DFSClient.rename(DFSClient.java:1555)
> 	at org.apache.hadoop.hdfs.DistributedFileSystem.rename(DistributedFileSystem.java:522)
> 	at org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:238)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> 	at java.lang.Thread.run(Thread.java:745)
> 17/08/18 15:31:47 WARN Client: interrupted waiting to send rpc request to server
> java.lang.InterruptedException
> 	at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404)
> 	at java.util.concurrent.FutureTask.get(FutureTask.java:191)
> 	at org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:970)
> 	at org.apache.hadoop.ipc.Client.call(Client.java:1320)
> 	at org.apache.hadoop.ipc.Client.call(Client.java:1300)
> 	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
> 	at com.sun.proxy.$Proxy38.rename(Unknown Source)
> 	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.rename(ClientNamenodeProtocolTranslatorPB.java:396)
> 	at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:497)
> 	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
> 	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
> 	at com.sun.proxy.$Proxy41.rename(Unknown Source)
> 	at org.apache.hadoop.hdfs.DFSClient.rename(DFSClient.java:1555)
> 	at org.apache.hadoop.hdfs.DistributedFileSystem.rename(DistributedFileSystem.java:522)
> 	at org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:238)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> 	at java.lang.Thread.run(Thread.java:745)
> 17/08/18 15:31:47 WARN Client: interrupted waiting to send rpc request to server
> java.lang.InterruptedException
> 	at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404)
> 	at java.util.concurrent.FutureTask.get(FutureTask.java:191)
> 	at org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:970)
> 	at org.apache.hadoop.ipc.Client.call(Client.java:1320)
> 	at org.apache.hadoop.ipc.Client.call(Client.java:1300)
> 	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
> 	at com.sun.proxy.$Proxy38.rename(Unknown Source)
> 	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.rename(ClientNamenodeProtocolTranslatorPB.java:396)
> 	at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:497)
> 	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
> 	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
> 	at com.sun.proxy.$Proxy41.rename(Unknown Source)
> 	at org.apache.hadoop.hdfs.DFSClient.rename(DFSClient.java:1555)
> 	at org.apache.hadoop.hdfs.DistributedFileSystem.rename(DistributedFileSystem.java:522)
> 	at org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:238)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> 	at java.lang.Thread.run(Thread.java:745)
> 17/08/18 15:31:47 WARN Client: interrupted waiting to send rpc request to server
> java.lang.InterruptedException
> 	at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404)
> 	at java.util.concurrent.FutureTask.get(FutureTask.java:191)
> 	at org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:970)
> 	at org.apache.hadoop.ipc.Client.call(Client.java:1320)
> 	at org.apache.hadoop.ipc.Client.call(Client.java:1300)
> 	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
> 	at com.sun.proxy.$Proxy38.rename(Unknown Source)
> 	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.rename(ClientNamenodeProtocolTranslatorPB.java:396)
> 	at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:497)
> 	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
> 	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
> 	at com.sun.proxy.$Proxy41.rename(Unknown Source)
> 	at org.apache.hadoop.hdfs.DFSClient.rename(DFSClient.java:1555)
> 	at org.apache.hadoop.hdfs.DistributedFileSystem.rename(DistributedFileSystem.java:522)
> 	at org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:238)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> 	at java.lang.Thread.run(Thread.java:745)
> 17/08/18 15:31:47 WARN Client: interrupted waiting to send rpc request to server
> java.lang.InterruptedException
> 	at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404)
> 	at java.util.concurrent.FutureTask.get(FutureTask.java:191)
> 	at org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:970)
> 	at org.apache.hadoop.ipc.Client.call(Client.java:1320)
> 	at org.apache.hadoop.ipc.Client.call(Client.java:1300)
> 	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
> 	at com.sun.proxy.$Proxy38.rename(Unknown Source)
> 	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.rename(ClientNamenodeProtocolTranslatorPB.java:396)
> 	at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:497)
> 	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
> 	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
> 	at com.sun.proxy.$Proxy41.rename(Unknown Source)
> 	at org.apache.hadoop.hdfs.DFSClient.rename(DFSClient.java:1555)
> 	at org.apache.hadoop.hdfs.DistributedFileSystem.rename(DistributedFileSystem.java:522)
> 	at org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:238)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> 	at java.lang.Thread.run(Thread.java:745)
> 17/08/18 15:31:47 WARN Client: interrupted waiting to send rpc request to server
> java.lang.InterruptedException
> 	at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404)
> 	at java.util.concurrent.FutureTask.get(FutureTask.java:191)
> 	at org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:970)
> 	at org.apache.hadoop.ipc.Client.call(Client.java:1320)
> 	at org.apache.hadoop.ipc.Client.call(Client.java:1300)
> 	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
> 	at com.sun.proxy.$Proxy38.rename(Unknown Source)
> 	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.rename(ClientNamenodeProtocolTranslatorPB.java:396)
> 	at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:497)
> 	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
> 	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
> 	at com.sun.proxy.$Proxy41.rename(Unknown Source)
> 	at org.apache.hadoop.hdfs.DFSClient.rename(DFSClient.java:1555)
> 	at org.apache.hadoop.hdfs.DistributedFileSystem.rename(DistributedFileSystem.java:522)
> 	at org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:238)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> 	at java.lang.Thread.run(Thread.java:745)
> 17/08/18 15:31:47 WARN Client: interrupted waiting to send rpc request to server
> java.lang.InterruptedException
> 	at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404)
> 	at java.util.concurrent.FutureTask.get(FutureTask.java:191)
> 	at org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:970)
> 	at org.apache.hadoop.ipc.Client.call(Client.java:1320)
> 	at org.apache.hadoop.ipc.Client.call(Client.java:1300)
> 	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
> 	at com.sun.proxy.$Proxy38.rename(Unknown Source)
> 	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.rename(ClientNamenodeProtocolTranslatorPB.java:396)
> 	at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:497)
> 	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
> 	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
> 	at com.sun.proxy.$Proxy41.rename(Unknown Source)
> 	at org.apache.hadoop.hdfs.DFSClient.rename(DFSClient.java:1555)
> 	at org.apache.hadoop.hdfs.DistributedFileSystem.rename(DistributedFileSystem.java:522)
> 	at org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:238)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> 	at java.lang.Thread.run(Thread.java:745)
> 17/08/18 15:31:47 WARN Client: interrupted waiting to send rpc request to server
> java.lang.InterruptedException
> 	at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404)
> 	at java.util.concurrent.FutureTask.get(FutureTask.java:191)
> 	at org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:970)
> 	at org.apache.hadoop.ipc.Client.call(Client.java:1320)
> 	at org.apache.hadoop.ipc.Client.call(Client.java:1300)
> 	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
> 	at com.sun.proxy.$Proxy38.rename(Unknown Source)
> 	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.rename(ClientNamenodeProtocolTranslatorPB.java:396)
> 	at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:497)
> 	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
> 	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
> 	at com.sun.proxy.$Proxy41.rename(Unknown Source)
> 	at org.apache.hadoop.hdfs.DFSClient.rename(DFSClient.java:1555)
> 	at org.apache.hadoop.hdfs.DistributedFileSystem.rename(DistributedFileSystem.java:522)
> 	at org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:238)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> 	at java.lang.Thread.run(Thread.java:745)
> 17/08/18 15:31:47 WARN Client: interrupted waiting to send rpc request to server
> java.lang.InterruptedException
> 	at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404)
> 	at java.util.concurrent.FutureTask.get(FutureTask.java:191)
> 	at org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:970)
> 	at org.apache.hadoop.ipc.Client.call(Client.java:1320)
> 	at org.apache.hadoop.ipc.Client.call(Client.java:1300)
> 	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
> 	at com.sun.proxy.$Proxy38.rename(Unknown Source)
> 	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.rename(ClientNamenodeProtocolTranslatorPB.java:396)
> 	at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:497)
> 	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
> 	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
> 	at com.sun.proxy.$Proxy41.rename(Unknown Source)
> 	at org.apache.hadoop.hdfs.DFSClient.rename(DFSClient.java:1555)
> 	at org.apache.hadoop.hdfs.DistributedFileSystem.rename(DistributedFileSystem.java:522)
> 	at org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:238)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> 	at java.lang.Thread.run(Thread.java:745)
> 17/08/18 15:31:47 WARN Client: interrupted waiting to send rpc request to server
> java.lang.InterruptedException
> 	at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404)
> 	at java.util.concurrent.FutureTask.get(FutureTask.java:191)
> 	at org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:970)
> 	at org.apache.hadoop.ipc.Client.call(Client.java:1320)
> 	at org.apache.hadoop.ipc.Client.call(Client.java:1300)
> 	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
> 	at com.sun.proxy.$Proxy38.rename(Unknown Source)
> 	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.rename(ClientNamenodeProtocolTranslatorPB.java:396)
> 	at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:497)
> 	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
> 	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
> 	at com.sun.proxy.$Proxy41.rename(Unknown Source)
> 	at org.apache.hadoop.hdfs.DFSClient.rename(DFSClient.java:1555)
> 	at org.apache.hadoop.hdfs.DistributedFileSystem.rename(DistributedFileSystem.java:522)
> 	at org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:238)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> 	at java.lang.Thread.run(Thread.java:745)
> 17/08/18 15:31:47 WARN Client: interrupted waiting to send rpc request to server
> java.lang.InterruptedException
> 	at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404)
> 	at java.util.concurrent.FutureTask.get(FutureTask.java:191)
> 	at org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:970)
> 	at org.apache.hadoop.ipc.Client.call(Client.java:1320)
> 	at org.apache.hadoop.ipc.Client.call(Client.java:1300)
> 	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
> 	at com.sun.proxy.$Proxy38.rename(Unknown Source)
> 	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.rename(ClientNamenodeProtocolTranslatorPB.java:396)
> 	at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:497)
> 	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
> 	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
> 	at com.sun.proxy.$Proxy41.rename(Unknown Source)
> 	at org.apache.hadoop.hdfs.DFSClient.rename(DFSClient.java:1555)
> 	at org.apache.hadoop.hdfs.DistributedFileSystem.rename(DistributedFileSystem.java:522)
> 	at org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:238)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> 	at java.lang.Thread.run(Thread.java:745)
> 17/08/18 15:31:47 WARN RetryInvocationHandler: Exception while invoking class org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.rename. Not retrying because failovers (15) exceeded maximum allowed (15)
> java.io.IOException: java.lang.InterruptedException
> 	at org.apache.hadoop.ipc.Client.call(Client.java:1326)
> 	at org.apache.hadoop.ipc.Client.call(Client.java:1300)
> 	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
> 	at com.sun.proxy.$Proxy38.rename(Unknown Source)
> 	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.rename(ClientNamenodeProtocolTranslatorPB.java:396)
> 	at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:497)
> 	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
> 	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
> 	at com.sun.proxy.$Proxy41.rename(Unknown Source)
> 	at org.apache.hadoop.hdfs.DFSClient.rename(DFSClient.java:1555)
> 	at org.apache.hadoop.hdfs.DistributedFileSystem.rename(DistributedFileSystem.java:522)
> 	at org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:238)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> 	at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.InterruptedException
> 	at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404)
> 	at java.util.concurrent.FutureTask.get(FutureTask.java:191)
> 	at org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:970)
> 	at org.apache.hadoop.ipc.Client.call(Client.java:1320)
> 	... 16 more
> 17/08/18 15:31:47 WARN CheckpointWriter: Error in attempt 1 of writing checkpoint to hdfs://bchcluster/tmp/Beam Job Spark0.58879054/spark-checkpoint/checkpoint-1503041490500
> java.io.IOException: java.lang.InterruptedException
> 	at org.apache.hadoop.ipc.Client.call(Client.java:1326)
> 	at org.apache.hadoop.ipc.Client.call(Client.java:1300)
> 	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
> 	at com.sun.proxy.$Proxy38.rename(Unknown Source)
> 	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.rename(ClientNamenodeProtocolTranslatorPB.java:396)
> 	at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:497)
> 	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
> 	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
> 	at com.sun.proxy.$Proxy41.rename(Unknown Source)
> 	at org.apache.hadoop.hdfs.DFSClient.rename(DFSClient.java:1555)
> 	at org.apache.hadoop.hdfs.DistributedFileSystem.rename(DistributedFileSystem.java:522)
> 	at org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:238)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> 	at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.InterruptedException
> 	at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404)
> 	at java.util.concurrent.FutureTask.get(FutureTask.java:191)
> 	at org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:970)
> 	at org.apache.hadoop.ipc.Client.call(Client.java:1320)
> 	... 16 more
> 17/08/18 15:31:47 WARN CheckpointWriter: Could not write checkpoint for time 1503041488000 ms to file hdfs://bchcluster/tmp/Beam Job Spark0.58879054/spark-checkpoint/checkpoint-1503041490500'
> 17/08/18 15:31:47 INFO JobScheduler: Stopped JobScheduler
> 17/08/18 15:31:47 INFO StreamingContext: StreamingContext stopped successfully
> Exception in thread "main" java.lang.RuntimeException: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0 in stage 93.0 (TID 30) had a not serializable result: org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet$StateAndTimers
> Serialization stack:
> 	- object not serializable (class: org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet$StateAndTimers, value: org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet$StateAndTimers@14b2cd55)
> 	- field (class: scala.Tuple2, name: _1, type: class java.lang.Object)
> 	- object (class scala.Tuple2, (org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet$StateAndTimers@14b2cd55,[]))
> 	- field (class: scala.Tuple2, name: _2, type: class java.lang.Object)
> 	- object (class scala.Tuple2, (org.apache.beam.runners.spark.util.ByteArray@eacf3ee4,(org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet$StateAndTimers@14b2cd55,[])))
> 	at org.apache.beam.runners.spark.SparkPipelineResult.runtimeExceptionFrom(SparkPipelineResult.java:55)
> 	at org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:72)
> 	at org.apache.beam.runners.spark.SparkPipelineResult.access$000(SparkPipelineResult.java:41)
> 	at org.apache.beam.runners.spark.SparkPipelineResult$StreamingMode.stop(SparkPipelineResult.java:163)
> 	at org.apache.beam.runners.spark.SparkPipelineResult.offerNewState(SparkPipelineResult.java:198)
> 	at org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:101)
> 	at org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:87)
> 	at com.chinamobile.cmss.example.TwoCountBaseApp.main(TwoCountBaseApp.java:83)
> Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0 in stage 93.0 (TID 30) had a not serializable result: org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet$StateAndTimers
> Serialization stack:
> 	- object not serializable (class: org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet$StateAndTimers, value: org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet$StateAndTimers@14b2cd55)
> 	- field (class: scala.Tuple2, name: _1, type: class java.lang.Object)
> 	- object (class scala.Tuple2, (org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet$StateAndTimers@14b2cd55,[]))
> 	- field (class: scala.Tuple2, name: _2, type: class java.lang.Object)
> 	- object (class scala.Tuple2, (org.apache.beam.runners.spark.util.ByteArray@eacf3ee4,(org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet$StateAndTimers@14b2cd55,[])))
> 	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
> 	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
> 	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
> 	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> 	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> 	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
> 	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
> 	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
> 	at scala.Option.foreach(Option.scala:236)
> 	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
> 	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
> 	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
> 	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
> 	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> 	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
> 	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
> 	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
> 	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
> 	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929)
> 	at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:912)
> 	at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:910)
> 	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
> 	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
> 	at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
> 	at org.apache.spark.rdd.RDD.foreach(RDD.scala:910)
> 	at org.apache.spark.api.java.JavaRDDLike$class.foreach(JavaRDDLike.scala:332)
> 	at org.apache.spark.api.java.AbstractJavaRDDLike.foreach(JavaRDDLike.scala:46)
> 	at org.apache.beam.runners.spark.translation.streaming.UnboundedDataset$1.call(UnboundedDataset.java:77)
> 	at org.apache.beam.runners.spark.translation.streaming.UnboundedDataset$1.call(UnboundedDataset.java:74)
> 	at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$3.apply(JavaDStreamLike.scala:335)
> 	at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$3.apply(JavaDStreamLike.scala:335)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
> 	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50)
> 	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
> 	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
> 	at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
> 	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49)
> 	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
> 	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
> 	at scala.util.Try$.apply(Try.scala:161)
> 	at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
> 	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224)
> 	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
> 	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
> 	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
> 	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> 	at java.lang.Thread.run(Thread.java:745) 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)