You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by li...@itri.org.tw on 2018/06/13 09:26:32 UTC

kafkaIO Run with Spark Runner: "streaming-job-executor-0"

Dear all,

I am using the kafkaIO in my project (Beam 2.0.0 with Spark runner).
My running environment is:
OS: Ubuntn 14.04.3 LTS
The different version for these tools:
JAVA: JDK 1.8
Beam 2.0.0 (Spark runner with Standalone mode)
Spark 1.6.0
Standalone mode :One driver node: ubuntu7; One master node: ubuntu8; Two worker nodes: ubuntu8 and ubuntu9
Kafka: 2.10-0.10.1.1

The java code of my project is:
==============================================================================
SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
options.setRunner(SparkRunner.class);
options.setSparkMaster("spark://ubuntu8:7077");
options.setAppName("App kafkaBeamTest");
options.setJobName("Job kafkaBeamTest");
options.setMaxRecordsPerBatch(1000L);

Pipeline p = Pipeline.create(options);

System.out.println("Beamtokafka");
PCollection<KV<Long, String>> readData = p.apply(KafkaIO.<Long, String>read()
.withBootstrapServers(ubuntu7:9092)
.withTopic("kafkasink")
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
       .withoutMetadata()
       );

PCollection<KV<Long, String>> readDivideData = readData.
apply(Window.<KV<Long,String>>into(FixedWindows.of(Duration.standardSeconds(1)))
     .triggering(AfterWatermark.pastEndOfWindow()
       .withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.ZERO)))
     .withAllowedLateness(Duration.ZERO) .discardingFiredPanes());

System.out.println("CountData");

PCollection<KV<Long, Long>> countData = readDivideData.apply(Count.perKey());

p.run();
==============================================================================

The message of error is:
==============================================================================
Exception in thread "streaming-job-executor-0" java.lang.Error: java.lang.InterruptedException
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1155)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.InterruptedException
        at java.lang.Object.wait(Native Method)
        at java.lang.Object.wait(Object.java:502)
        at org.apache.spark.scheduler.JobWaiter.awaitResult(JobWaiter.scala:73)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:612)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929)
        at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:912)
        at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:910)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
        at org.apache.spark.rdd.RDD.foreach(RDD.scala:910)
…
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
... 2 more
==============================================================================

Maven 3.5.0, in which related dependencies are listed in my project’s pom.xml:
<dependency>
<groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-core</artifactId>
  <version>2.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
   <artifactId>beam-sdks-java-io-kafka</artifactId>
   <version>2.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
  <artifactId>spark-core_2.10</artifactId>
  <version>1.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
  <artifactId>spark-streaming_2.10</artifactId>
  <version>1.6.0</version>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
  <artifactId>kafka-clients</artifactId>
  <version>0.10.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
  <artifactId>kafka_2.10</artifactId>
  <version>0.10.1.1</version>
</dependency>


When I use the above code in Spark Runner (Local [4]), this project worked well (2000~4000 data/s). However, if I run it on Standalone mode, it failed along with the above error.

If you have any idea about the error ("streaming-job-executor-0"), I am looking forward to hearing from you.

Note that: perform command line is “./spark-submit --class com.itri.beam.kafkatest --master spark:// ubuntu8:7077 /root/BeamKafkaAdvanced-0.1-shaded.jar --runner=SparkRunner”

Thanks

Rick




--
本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。 This email may contain confidential information. Please do not use or disclose it in any way and delete it if you are not the intended recipient.

RE: kafkaIO Run with Spark Runner: "streaming-job-executor-0"

Posted by li...@itri.org.tw.
Hi,

I insert one a text in my code as follows:
…
.apply(Window.<String>in.of(Duration.standardSeconds(1)))
              .triggering(AfterWatermark.pastEndOfWindow()
                .withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.ZERO)))
              .withAllowedLateness(Duration.standardSeconds(2))
              .discardingFiredPanes())

.apply(Count.perElement());

//"JdbcData to DB"
.apply(JdbcIO.<Long>write()
  .withDataSourceConfiguration
…

Rick

From: linrick@itri.org.tw [mailto:linrick@itri.org.tw]
Sent: Thursday, June 21, 2018 10:42 AM
To: user@beam.apache.org
Subject: RE: kafkaIO Run with Spark Runner: "streaming-job-executor-0"

Dear all,

The related version in my running env:
Beam 2.4.0 (Spark runner with Standalone mode)
Spark 2.0.0
Kafka: 2.11-0.10.1.1
, and the setting of Pom.xml as mentioned earlier.

When we run the following code:

args=new String[]{"--runner=SparkRunner","--sparkMaster=spark://ubuntu8:7077"};
SparkPipelineOptions options=PipelineOptionsFactory.fromArgs(args).withValidation().as(SparkPipelineOptions.class);

PCollection<String> readData = p

//"Use kafkaIO to inject data"
.apply(KafkaIO.<Long, String>read()
.withBootstrapServers("ubuntu7:9092")
    .withTopic("kafkasink")
    .withKeyDeserializer(LongDeserializer.class)
    .withValueDeserializer(StringDeserializer.class)
.withMaxNumRecords(100000)  // If I set this parameter, the code works.;If I do not set this parameter, the executor of Spark will be closed.
    .withoutMetadata())
.apply(Values.<String>create())

//"FixedWindows to collect data"
.apply(Window.<String>in.of(Duration.standardSeconds(1)))
              .triggering(AfterWatermark.pastEndOfWindow()
                .withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.ZERO)))
              .withAllowedLateness(Duration.standardSeconds(2))
              .discardingFiredPanes())
//"JdbcData to DB"
.apply(JdbcIO.<Long>write()
    .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
      "org.postgresql.Driver",
      "jdbc:postgresql://ubuntu7:5432/raw_c42a25f4bd3d74429dbeb6162e60e5c7")
      .withUsername("postgres")
      .withPassword("postgres"))
     .withStatement("insert into kafkabeamdata (count) values(?)")
     .withPreparedStatementSetter(new JdbcIO.PreparedStatementSetter<Long>() {
     @Override
     public void setParameters(Long element, PreparedStatement query)
      throws SQLException {
        double count = element.doubleValue();
        query.setDouble(1, count);}
        }));

p.run();

However, this project need to continuously inject data from kafka and input result into PostgreSQL DB every one second.
If we set parameter “withMaxNumRecords(100000)”, the code works and input result once until the collected data of number is 100000.

In addition, if I run this code under Beam (Spark runner with Local[4]), it is successful (continuously inject data).

Therefore, we are not sure if this may be a bug about kafkaIO?

If any further information is needed, we are glad to be informed and will provide to you as soon as possible.

We are looking forward to hearing from you.

Thanks very much for your attention to our problem.

Sincerely yours,

Rick


From: linrick@itri.org.tw<ma...@itri.org.tw> [mailto:linrick@itri.org.tw]
Sent: Thursday, June 14, 2018 2:48 PM
To: user@beam.apache.org<ma...@beam.apache.org>
Subject: RE: kafkaIO Run with Spark Runner: "streaming-job-executor-0"


Hi all,



I have changed these versions of tools, as:



Beam 2.4.0 (Spark runner with Standalone mode)

Spark 2.0.0

Kafka: 2.11-0.10.1.1
Pom.xml:
<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-core</artifactId>
  <version>2.4.0</version>
</dependency>

<dependency>
   <groupId>org.apache.beam</groupId>
   <artifactId>beam-runners-spark</artifactId>
   <version>2.4.0</version>
</dependency>

<dependency>
<groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-io-kafka</artifactId>
  <version>2.4.0</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
  <artifactId>spark-core_2.11</artifactId>
<version>2.0.0</version>
</dependency>

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-streaming_2.11</artifactId>

  <version>2.0.0</version>
</dependency>


<dependency>
<groupId>org.apache.kafka</groupId>
 <artifactId>kafka-clients</artifactId>
 <version>0.10.1.1</version>

</dependency>



Here, there is new error:



ubuntu9 worker log:
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
18/06/14 14:36:27 INFO CoarseGrainedExecutorBackend: Started daemon with process name: 22990@ubuntu9
18/06/14 14:36:27 INFO SignalUtils: Registered signal handler for TERM
18/06/14 14:36:27 INFO SignalUtils: Registered signal handler for HUP
18/06/14 14:36:27 INFO SignalUtils: Registered signal handler for INT
18/06/14 14:36:28 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
18/06/14 14:36:28 INFO SecurityManager: Changing view acls to: root
18/06/14 14:36:28 INFO SecurityManager: Changing modify acls to: root
18/06/14 14:36:28 INFO SecurityManager: Changing view acls groups to:
18/06/14 14:36:28 INFO SecurityManager: Changing modify acls groups to:

…

18/06/14 14:36:28 INFO DiskBlockManager: Created local directory at /tmp/spark-e035a190-2ab4-4167-b0c1-7868bac7afc1/executor-083a9db8-994a-4860-b2fa-d5f490bac01d/blockmgr-568096a9-2060-4717-a3e4-99d4abcee7bd

18/06/14 14:36:28 INFO MemoryStore: MemoryStore started with capacity 5.2 GB

18/06/14 14:36:28 ERROR CoarseGrainedExecutorBackend: RECEIVED SIGNAL TERM

B





Ubuntu8 worker log:

18/06/14 14:36:30 INFO MemoryStore: MemoryStore started with capacity 5.2 GB

18/06/14 14:36:30 INFO CoarseGrainedExecutorBackend: Connecting to driver: spark://CoarseGrainedScheduler@ubuntu7:43572

18/06/14 14:36:30 INFO WorkerWatcher: Connecting to worker spark://Worker@ubuntu8:39499

18/06/14 14:36:30 ERROR CoarseGrainedExecutorBackend: Cannot register with driver: spark://



I have no idea about the error.



In addition, related configures of spark are:



Master node:

Spark-env.sh

export SPARK_MASTER_HOST="xx.xxx.x.x"

export SPARK_MASTER_WEBUI_PORT=1234



Spark-default.conf

spark.driver.memory              10g

spark.executor.memory            2g

spark.executor.instances           4



Worker node:

Spark-env.sh

export SPARK_MASTER_HOST="xx.xxx.x.x"

export SPARK_MASTER_WEBUI_PORT=1234



Spark-default.conf

spark.driver.memory              10g

spark.executor.memory            2g

spark.executor.instances           4



Rick



-----Original Message-----
From: Ismaël Mejía [mailto:iemejia@gmail.com]
Sent: Wednesday, June 13, 2018 11:35 PM
To: user@beam.apache.org<ma...@beam.apache.org>
Subject: Re: kafkaIO Run with Spark Runner: "streaming-job-executor-0"



Can you please update the version of Beam to at least version 2.2.0.

There were some important fixes in streaming after the 2.0.0 release so this could be related. Ideally you should use the latest released version (2.4.0). Remember that starting with Beam 2.3.0 the Spark runner is based on Spark 2.



On Wed, Jun 13, 2018 at 5:11 PM Raghu Angadi <ra...@google.com>> wrote:

>

> Can you check the logs on the worker?

>

> On Wed, Jun 13, 2018 at 2:26 AM <li...@itri.org.tw>> wrote:

>>

>> Dear all,

>>

>>

>>

>> I am using the kafkaIO in my project (Beam 2.0.0 with Spark runner).

>>

>> My running environment is:

>>

>> OS: Ubuntn 14.04.3 LTS

>>

>> The different version for these tools:

>>

>> JAVA: JDK 1.8

>>

>> Beam 2.0.0 (Spark runner with Standalone mode)

>>

>> Spark 1.6.0

>>

>> Standalone mode :One driver node: ubuntu7; One master node: ubuntu8;

>> Two worker nodes: ubuntu8 and ubuntu9

>>

>> Kafka: 2.10-0.10.1.1

>>

>>

>>

>> The java code of my project is:

>>

>> =====================================================================

>> =========

>>

>> SparkPipelineOptions options =

>> PipelineOptionsFactory.as(SparkPipelineOptions.class);

>>

>> options.setRunner(SparkRunner.class);

>>

>> options.setSparkMaster("spark://ubuntu8:7077");

>>

>> options.setAppName("App kafkaBeamTest");

>>

>> options.setJobName("Job kafkaBeamTest");

>>

>> options.setMaxRecordsPerBatch(1000L);

>>

>>

>>

>> Pipeline p = Pipeline.create(options);

>>

>>

>>

>> System.out.println("Beamtokafka");

>>

>> PCollection<KV<Long, String>> readData = p.apply(KafkaIO.<Long,

>> String>read()

>>

>> .withBootstrapServers(ubuntu7:9092)

>>

>> .withTopic("kafkasink")

>>

>> .withKeyDeserializer(LongDeserializer.class)

>>

>> .withValueDeserializer(StringDeserializer.class)

>>

>>        .withoutMetadata()

>>

>>        );

>>

>>

>>

>> PCollection<KV<Long, String>> readDivideData = readData.

>>

>> apply(Window.<KV<Long,String>>into(FixedWindows.of(Duration.standardS

>> econds(1)))

>>

>>      .triggering(AfterWatermark.pastEndOfWindow()

>>

>>

>> .withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDel

>> ayOf(Duration.ZERO)))

>>

>>      .withAllowedLateness(Duration.ZERO) .discardingFiredPanes());

>>

>>

>>

>> System.out.println("CountData");

>>

>>

>>

>> PCollection<KV<Long, Long>> countData =

>> readDivideData.apply(Count.perKey());

>>

>>

>>

>> p.run();

>>

>> =====================================================================

>> =========

>>

>>

>>

>> The message of error is:

>>

>> =====================================================================

>> =========

>>

>> Exception in thread "streaming-job-executor-0" java.lang.Error:

>> java.lang.InterruptedException

>>

>>         at

>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.

>> java:1155)

>>

>>         at

>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor

>> .java:624)

>>

>>         at java.lang.Thread.run(Thread.java:748)

>>

>> Caused by: java.lang.InterruptedException

>>

>>         at java.lang.Object.wait(Native Method)

>>

>>         at java.lang.Object.wait(Object.java:502)

>>

>>         at

>> org.apache.spark.scheduler.JobWaiter.awaitResult(JobWaiter.scala:73)

>>

>>         at

>> org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:612

>> )

>>

>>         at

>> org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)

>>

>>         at

>> org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)

>>

>>         at

>> org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)

>>

>>         at

>> org.apache.spark.SparkContext.runJob(SparkContext.scala:1929)

>>

>>         at

>> org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:912)

>>

>>         at

>> org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:910)

>>

>>         at

>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.s

>> cala:150)

>>

>>         at

>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.s

>> cala:111)

>>

>>         at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)

>>

>>         at org.apache.spark.rdd.RDD.foreach(RDD.scala:910)

>>

>> …

>>

>> at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)

>>

>> at

>> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun

>> $run$1.apply$mcV$sp(JobScheduler.scala:224)

>>

>> at

>> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun

>> $run$1.apply(JobScheduler.scala:224)

>>

>> at

>> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun

>> $run$1.apply(JobScheduler.scala:224)

>>

>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)

>>

>> at

>> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobS

>> cheduler.scala:223)

>>

>> at

>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.

>> java:1149)

>>

>> ... 2 more

>>

>> =====================================================================

>> =========

>>

>>

>>

>> Maven 3.5.0, in which related dependencies are listed in my project’s pom.xml:

>>

>> <dependency>

>>

>> <groupId>org.apache.beam</groupId>

>>

>>   <artifactId>beam-sdks-java-core</artifactId>

>>

>>   <version>2.0.0</version>

>>

>> </dependency>

>>

>> <dependency>

>>

>> <groupId>org.apache.beam</groupId>

>>

>>    <artifactId>beam-sdks-java-io-kafka</artifactId>

>>

>>    <version>2.0.0</version>

>>

>> </dependency>

>>

>> <dependency>

>>

>> <groupId>org.apache.spark</groupId>

>>

>>   <artifactId>spark-core_2.10</artifactId>

>>

>>   <version>1.6.0</version>

>>

>> </dependency>

>>

>> <dependency>

>>

>> <groupId>org.apache.spark</groupId>

>>

>>   <artifactId>spark-streaming_2.10</artifactId>

>>

>>   <version>1.6.0</version>

>>

>> </dependency>

>>

>>

>>

>> <dependency>

>>

>> <groupId>org.apache.kafka</groupId>

>>

>>   <artifactId>kafka-clients</artifactId>

>>

>>   <version>0.10.1.1</version>

>>

>> </dependency>

>>

>> <dependency>

>>

>> <groupId>org.apache.kafka</groupId>

>>

>>   <artifactId>kafka_2.10</artifactId>

>>

>>   <version>0.10.1.1</version>

>>

>> </dependency>

>>

>>

>>

>>

>>

>> When I use the above code in Spark Runner (Local [4]), this project worked well (2000~4000 data/s). However, if I run it on Standalone mode, it failed along with the above error.

>>

>>

>>

>> If you have any idea about the error ("streaming-job-executor-0"), I am looking forward to hearing from you.

>>

>>

>>

>> Note that: perform command line is “./spark-submit --class com.itri.beam.kafkatest --master spark:// ubuntu8:7077 /root/BeamKafkaAdvanced-0.1-shaded.jar --runner=SparkRunner”

>>

>>

>>

>> Thanks

>>

>>

>>

>> Rick

>>

>>

>>

>>

>>

>>

>>

>> --

>> 本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。 This email may contain confidential information. Please do not use or disclose it in any way and delete it if you are not the intended recipient.


--
本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。 This email may contain confidential information. Please do not use or disclose it in any way and delete it if you are not the intended recipient.


--
本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。 This email may contain confidential information. Please do not use or disclose it in any way and delete it if you are not the intended recipient.


--
本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。 This email may contain confidential information. Please do not use or disclose it in any way and delete it if you are not the intended recipient.

RE: kafkaIO Run with Spark Runner: "streaming-job-executor-0"

Posted by li...@itri.org.tw.
Dear all,

The related version in my running env:
Beam 2.4.0 (Spark runner with Standalone mode)
Spark 2.0.0
Kafka: 2.11-0.10.1.1
, and the setting of Pom.xml as mentioned earlier.

When we run the following code:

args=new String[]{"--runner=SparkRunner","--sparkMaster=spark://ubuntu8:7077"};
SparkPipelineOptions options=PipelineOptionsFactory.fromArgs(args).withValidation().as(SparkPipelineOptions.class);

PCollection<String> readData = p

//"Use kafkaIO to inject data"
.apply(KafkaIO.<Long, String>read()
.withBootstrapServers("ubuntu7:9092")
    .withTopic("kafkasink")
    .withKeyDeserializer(LongDeserializer.class)
    .withValueDeserializer(StringDeserializer.class)
.withMaxNumRecords(100000)  // If I set this parameter, the code works.;If I do not set this parameter, the executor of Spark will be closed.
    .withoutMetadata())
.apply(Values.<String>create())

//"FixedWindows to collect data"
.apply(Window.<String>in.of(Duration.standardSeconds(1)))
              .triggering(AfterWatermark.pastEndOfWindow()
                .withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.ZERO)))
              .withAllowedLateness(Duration.standardSeconds(2))
              .discardingFiredPanes())
//"JdbcData to DB"
.apply(JdbcIO.<Long>write()
    .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
      "org.postgresql.Driver",
      "jdbc:postgresql://ubuntu7:5432/raw_c42a25f4bd3d74429dbeb6162e60e5c7")
      .withUsername("postgres")
      .withPassword("postgres"))
     .withStatement("insert into kafkabeamdata (count) values(?)")
     .withPreparedStatementSetter(new JdbcIO.PreparedStatementSetter<Long>() {
     @Override
     public void setParameters(Long element, PreparedStatement query)
      throws SQLException {
        double count = element.doubleValue();
        query.setDouble(1, count);}
        }));

p.run();

However, this project need to continuously inject data from kafka and input result into PostgreSQL DB every one second.
If we set parameter “withMaxNumRecords(100000)”, the code works and input result once until the collected data of number is 100000.

In addition, if I run this code under Beam (Spark runner with Local[4]), it is successful (continuously inject data).

Therefore, we are not sure if this may be a bug about kafkaIO?

If any further information is needed, we are glad to be informed and will provide to you as soon as possible.

We are looking forward to hearing from you.

Thanks very much for your attention to our problem.

Sincerely yours,

Rick


From: linrick@itri.org.tw [mailto:linrick@itri.org.tw]
Sent: Thursday, June 14, 2018 2:48 PM
To: user@beam.apache.org
Subject: RE: kafkaIO Run with Spark Runner: "streaming-job-executor-0"


Hi all,



I have changed these versions of tools, as:



Beam 2.4.0 (Spark runner with Standalone mode)

Spark 2.0.0

Kafka: 2.11-0.10.1.1
Pom.xml:
<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-core</artifactId>
  <version>2.4.0</version>
</dependency>

<dependency>
   <groupId>org.apache.beam</groupId>
   <artifactId>beam-runners-spark</artifactId>
   <version>2.4.0</version>
</dependency>

<dependency>
<groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-io-kafka</artifactId>
  <version>2.4.0</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
  <artifactId>spark-core_2.11</artifactId>
<version>2.0.0</version>
</dependency>

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-streaming_2.11</artifactId>

  <version>2.0.0</version>
</dependency>


<dependency>
<groupId>org.apache.kafka</groupId>
 <artifactId>kafka-clients</artifactId>
 <version>0.10.1.1</version>

</dependency>



Here, there is new error:



ubuntu9 worker log:
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
18/06/14 14:36:27 INFO CoarseGrainedExecutorBackend: Started daemon with process name: 22990@ubuntu9
18/06/14 14:36:27 INFO SignalUtils: Registered signal handler for TERM
18/06/14 14:36:27 INFO SignalUtils: Registered signal handler for HUP
18/06/14 14:36:27 INFO SignalUtils: Registered signal handler for INT
18/06/14 14:36:28 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
18/06/14 14:36:28 INFO SecurityManager: Changing view acls to: root
18/06/14 14:36:28 INFO SecurityManager: Changing modify acls to: root
18/06/14 14:36:28 INFO SecurityManager: Changing view acls groups to:
18/06/14 14:36:28 INFO SecurityManager: Changing modify acls groups to:

…

18/06/14 14:36:28 INFO DiskBlockManager: Created local directory at /tmp/spark-e035a190-2ab4-4167-b0c1-7868bac7afc1/executor-083a9db8-994a-4860-b2fa-d5f490bac01d/blockmgr-568096a9-2060-4717-a3e4-99d4abcee7bd

18/06/14 14:36:28 INFO MemoryStore: MemoryStore started with capacity 5.2 GB

18/06/14 14:36:28 ERROR CoarseGrainedExecutorBackend: RECEIVED SIGNAL TERM

B





Ubuntu8 worker log:

18/06/14 14:36:30 INFO MemoryStore: MemoryStore started with capacity 5.2 GB

18/06/14 14:36:30 INFO CoarseGrainedExecutorBackend: Connecting to driver: spark://CoarseGrainedScheduler@ubuntu7:43572

18/06/14 14:36:30 INFO WorkerWatcher: Connecting to worker spark://Worker@ubuntu8:39499

18/06/14 14:36:30 ERROR CoarseGrainedExecutorBackend: Cannot register with driver: spark://



I have no idea about the error.



In addition, related configures of spark are:



Master node:

Spark-env.sh

export SPARK_MASTER_HOST="xx.xxx.x.x"

export SPARK_MASTER_WEBUI_PORT=1234



Spark-default.conf

spark.driver.memory              10g

spark.executor.memory            2g

spark.executor.instances           4



Worker node:

Spark-env.sh

export SPARK_MASTER_HOST="xx.xxx.x.x"

export SPARK_MASTER_WEBUI_PORT=1234



Spark-default.conf

spark.driver.memory              10g

spark.executor.memory            2g

spark.executor.instances           4



Rick



-----Original Message-----
From: Ismaël Mejía [mailto:iemejia@gmail.com]
Sent: Wednesday, June 13, 2018 11:35 PM
To: user@beam.apache.org<ma...@beam.apache.org>
Subject: Re: kafkaIO Run with Spark Runner: "streaming-job-executor-0"



Can you please update the version of Beam to at least version 2.2.0.

There were some important fixes in streaming after the 2.0.0 release so this could be related. Ideally you should use the latest released version (2.4.0). Remember that starting with Beam 2.3.0 the Spark runner is based on Spark 2.



On Wed, Jun 13, 2018 at 5:11 PM Raghu Angadi <ra...@google.com>> wrote:

>

> Can you check the logs on the worker?

>

> On Wed, Jun 13, 2018 at 2:26 AM <li...@itri.org.tw>> wrote:

>>

>> Dear all,

>>

>>

>>

>> I am using the kafkaIO in my project (Beam 2.0.0 with Spark runner).

>>

>> My running environment is:

>>

>> OS: Ubuntn 14.04.3 LTS

>>

>> The different version for these tools:

>>

>> JAVA: JDK 1.8

>>

>> Beam 2.0.0 (Spark runner with Standalone mode)

>>

>> Spark 1.6.0

>>

>> Standalone mode :One driver node: ubuntu7; One master node: ubuntu8;

>> Two worker nodes: ubuntu8 and ubuntu9

>>

>> Kafka: 2.10-0.10.1.1

>>

>>

>>

>> The java code of my project is:

>>

>> =====================================================================

>> =========

>>

>> SparkPipelineOptions options =

>> PipelineOptionsFactory.as(SparkPipelineOptions.class);

>>

>> options.setRunner(SparkRunner.class);

>>

>> options.setSparkMaster("spark://ubuntu8:7077");

>>

>> options.setAppName("App kafkaBeamTest");

>>

>> options.setJobName("Job kafkaBeamTest");

>>

>> options.setMaxRecordsPerBatch(1000L);

>>

>>

>>

>> Pipeline p = Pipeline.create(options);

>>

>>

>>

>> System.out.println("Beamtokafka");

>>

>> PCollection<KV<Long, String>> readData = p.apply(KafkaIO.<Long,

>> String>read()

>>

>> .withBootstrapServers(ubuntu7:9092)

>>

>> .withTopic("kafkasink")

>>

>> .withKeyDeserializer(LongDeserializer.class)

>>

>> .withValueDeserializer(StringDeserializer.class)

>>

>>        .withoutMetadata()

>>

>>        );

>>

>>

>>

>> PCollection<KV<Long, String>> readDivideData = readData.

>>

>> apply(Window.<KV<Long,String>>into(FixedWindows.of(Duration.standardS

>> econds(1)))

>>

>>      .triggering(AfterWatermark.pastEndOfWindow()

>>

>>

>> .withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDel

>> ayOf(Duration.ZERO)))

>>

>>      .withAllowedLateness(Duration.ZERO) .discardingFiredPanes());

>>

>>

>>

>> System.out.println("CountData");

>>

>>

>>

>> PCollection<KV<Long, Long>> countData =

>> readDivideData.apply(Count.perKey());

>>

>>

>>

>> p.run();

>>

>> =====================================================================

>> =========

>>

>>

>>

>> The message of error is:

>>

>> =====================================================================

>> =========

>>

>> Exception in thread "streaming-job-executor-0" java.lang.Error:

>> java.lang.InterruptedException

>>

>>         at

>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.

>> java:1155)

>>

>>         at

>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor

>> .java:624)

>>

>>         at java.lang.Thread.run(Thread.java:748)

>>

>> Caused by: java.lang.InterruptedException

>>

>>         at java.lang.Object.wait(Native Method)

>>

>>         at java.lang.Object.wait(Object.java:502)

>>

>>         at

>> org.apache.spark.scheduler.JobWaiter.awaitResult(JobWaiter.scala:73)

>>

>>         at

>> org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:612

>> )

>>

>>         at

>> org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)

>>

>>         at

>> org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)

>>

>>         at

>> org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)

>>

>>         at

>> org.apache.spark.SparkContext.runJob(SparkContext.scala:1929)

>>

>>         at

>> org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:912)

>>

>>         at

>> org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:910)

>>

>>         at

>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.s

>> cala:150)

>>

>>         at

>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.s

>> cala:111)

>>

>>         at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)

>>

>>         at org.apache.spark.rdd.RDD.foreach(RDD.scala:910)

>>

>> …

>>

>> at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)

>>

>> at

>> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun

>> $run$1.apply$mcV$sp(JobScheduler.scala:224)

>>

>> at

>> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun

>> $run$1.apply(JobScheduler.scala:224)

>>

>> at

>> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun

>> $run$1.apply(JobScheduler.scala:224)

>>

>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)

>>

>> at

>> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobS

>> cheduler.scala:223)

>>

>> at

>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.

>> java:1149)

>>

>> ... 2 more

>>

>> =====================================================================

>> =========

>>

>>

>>

>> Maven 3.5.0, in which related dependencies are listed in my project’s pom.xml:

>>

>> <dependency>

>>

>> <groupId>org.apache.beam</groupId>

>>

>>   <artifactId>beam-sdks-java-core</artifactId>

>>

>>   <version>2.0.0</version>

>>

>> </dependency>

>>

>> <dependency>

>>

>> <groupId>org.apache.beam</groupId>

>>

>>    <artifactId>beam-sdks-java-io-kafka</artifactId>

>>

>>    <version>2.0.0</version>

>>

>> </dependency>

>>

>> <dependency>

>>

>> <groupId>org.apache.spark</groupId>

>>

>>   <artifactId>spark-core_2.10</artifactId>

>>

>>   <version>1.6.0</version>

>>

>> </dependency>

>>

>> <dependency>

>>

>> <groupId>org.apache.spark</groupId>

>>

>>   <artifactId>spark-streaming_2.10</artifactId>

>>

>>   <version>1.6.0</version>

>>

>> </dependency>

>>

>>

>>

>> <dependency>

>>

>> <groupId>org.apache.kafka</groupId>

>>

>>   <artifactId>kafka-clients</artifactId>

>>

>>   <version>0.10.1.1</version>

>>

>> </dependency>

>>

>> <dependency>

>>

>> <groupId>org.apache.kafka</groupId>

>>

>>   <artifactId>kafka_2.10</artifactId>

>>

>>   <version>0.10.1.1</version>

>>

>> </dependency>

>>

>>

>>

>>

>>

>> When I use the above code in Spark Runner (Local [4]), this project worked well (2000~4000 data/s). However, if I run it on Standalone mode, it failed along with the above error.

>>

>>

>>

>> If you have any idea about the error ("streaming-job-executor-0"), I am looking forward to hearing from you.

>>

>>

>>

>> Note that: perform command line is “./spark-submit --class com.itri.beam.kafkatest --master spark:// ubuntu8:7077 /root/BeamKafkaAdvanced-0.1-shaded.jar --runner=SparkRunner”

>>

>>

>>

>> Thanks

>>

>>

>>

>> Rick

>>

>>

>>

>>

>>

>>

>>

>> --

>> 本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。 This email may contain confidential information. Please do not use or disclose it in any way and delete it if you are not the intended recipient.


--
本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。 This email may contain confidential information. Please do not use or disclose it in any way and delete it if you are not the intended recipient.


--
本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。 This email may contain confidential information. Please do not use or disclose it in any way and delete it if you are not the intended recipient.

RE: kafkaIO Run with Spark Runner: "streaming-job-executor-0"

Posted by li...@itri.org.tw.
Hi all,



I have changed these versions of tools, as:



Beam 2.4.0 (Spark runner with Standalone mode)

Spark 2.0.0

Kafka: 2.11-0.10.1.1
Pom.xml:
<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-core</artifactId>
  <version>2.4.0</version>
</dependency>

<dependency>
   <groupId>org.apache.beam</groupId>
   <artifactId>beam-runners-spark</artifactId>
   <version>2.4.0</version>
</dependency>

<dependency>
<groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-io-kafka</artifactId>
  <version>2.4.0</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
  <artifactId>spark-core_2.11</artifactId>
<version>2.0.0</version>
</dependency>

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-streaming_2.11</artifactId>

  <version>2.0.0</version>
</dependency>


<dependency>
<groupId>org.apache.kafka</groupId>
 <artifactId>kafka-clients</artifactId>
 <version>0.10.1.1</version>

</dependency>



Here, there is new error:



ubuntu9 worker log:
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
18/06/14 14:36:27 INFO CoarseGrainedExecutorBackend: Started daemon with process name: 22990@ubuntu9
18/06/14 14:36:27 INFO SignalUtils: Registered signal handler for TERM
18/06/14 14:36:27 INFO SignalUtils: Registered signal handler for HUP
18/06/14 14:36:27 INFO SignalUtils: Registered signal handler for INT
18/06/14 14:36:28 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
18/06/14 14:36:28 INFO SecurityManager: Changing view acls to: root
18/06/14 14:36:28 INFO SecurityManager: Changing modify acls to: root
18/06/14 14:36:28 INFO SecurityManager: Changing view acls groups to:
18/06/14 14:36:28 INFO SecurityManager: Changing modify acls groups to:

…

18/06/14 14:36:28 INFO DiskBlockManager: Created local directory at /tmp/spark-e035a190-2ab4-4167-b0c1-7868bac7afc1/executor-083a9db8-994a-4860-b2fa-d5f490bac01d/blockmgr-568096a9-2060-4717-a3e4-99d4abcee7bd

18/06/14 14:36:28 INFO MemoryStore: MemoryStore started with capacity 5.2 GB

18/06/14 14:36:28 ERROR CoarseGrainedExecutorBackend: RECEIVED SIGNAL TERM

B





Ubuntu8 worker log:

18/06/14 14:36:30 INFO MemoryStore: MemoryStore started with capacity 5.2 GB

18/06/14 14:36:30 INFO CoarseGrainedExecutorBackend: Connecting to driver: spark://CoarseGrainedScheduler@ubuntu7:43572

18/06/14 14:36:30 INFO WorkerWatcher: Connecting to worker spark://Worker@ubuntu8:39499

18/06/14 14:36:30 ERROR CoarseGrainedExecutorBackend: Cannot register with driver: spark://



I have no idea about the error.



In addition, related configures of spark are:



Master node:

Spark-env.sh

export SPARK_MASTER_HOST="xx.xxx.x.x"

export SPARK_MASTER_WEBUI_PORT=1234



Spark-default.conf

spark.driver.memory              10g

spark.executor.memory            2g

spark.executor.instances           4



Worker node:

Spark-env.sh

export SPARK_MASTER_HOST="xx.xxx.x.x"

export SPARK_MASTER_WEBUI_PORT=1234



Spark-default.conf

spark.driver.memory              10g

spark.executor.memory            2g

spark.executor.instances           4



Rick



-----Original Message-----
From: Ismaël Mejía [mailto:iemejia@gmail.com]
Sent: Wednesday, June 13, 2018 11:35 PM
To: user@beam.apache.org
Subject: Re: kafkaIO Run with Spark Runner: "streaming-job-executor-0"



Can you please update the version of Beam to at least version 2.2.0.

There were some important fixes in streaming after the 2.0.0 release so this could be related. Ideally you should use the latest released version (2.4.0). Remember that starting with Beam 2.3.0 the Spark runner is based on Spark 2.



On Wed, Jun 13, 2018 at 5:11 PM Raghu Angadi <ra...@google.com>> wrote:

>

> Can you check the logs on the worker?

>

> On Wed, Jun 13, 2018 at 2:26 AM <li...@itri.org.tw>> wrote:

>>

>> Dear all,

>>

>>

>>

>> I am using the kafkaIO in my project (Beam 2.0.0 with Spark runner).

>>

>> My running environment is:

>>

>> OS: Ubuntn 14.04.3 LTS

>>

>> The different version for these tools:

>>

>> JAVA: JDK 1.8

>>

>> Beam 2.0.0 (Spark runner with Standalone mode)

>>

>> Spark 1.6.0

>>

>> Standalone mode :One driver node: ubuntu7; One master node: ubuntu8;

>> Two worker nodes: ubuntu8 and ubuntu9

>>

>> Kafka: 2.10-0.10.1.1

>>

>>

>>

>> The java code of my project is:

>>

>> =====================================================================

>> =========

>>

>> SparkPipelineOptions options =

>> PipelineOptionsFactory.as(SparkPipelineOptions.class);

>>

>> options.setRunner(SparkRunner.class);

>>

>> options.setSparkMaster("spark://ubuntu8:7077");

>>

>> options.setAppName("App kafkaBeamTest");

>>

>> options.setJobName("Job kafkaBeamTest");

>>

>> options.setMaxRecordsPerBatch(1000L);

>>

>>

>>

>> Pipeline p = Pipeline.create(options);

>>

>>

>>

>> System.out.println("Beamtokafka");

>>

>> PCollection<KV<Long, String>> readData = p.apply(KafkaIO.<Long,

>> String>read()

>>

>> .withBootstrapServers(ubuntu7:9092)

>>

>> .withTopic("kafkasink")

>>

>> .withKeyDeserializer(LongDeserializer.class)

>>

>> .withValueDeserializer(StringDeserializer.class)

>>

>>        .withoutMetadata()

>>

>>        );

>>

>>

>>

>> PCollection<KV<Long, String>> readDivideData = readData.

>>

>> apply(Window.<KV<Long,String>>into(FixedWindows.of(Duration.standardS

>> econds(1)))

>>

>>      .triggering(AfterWatermark.pastEndOfWindow()

>>

>>

>> .withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDel

>> ayOf(Duration.ZERO)))

>>

>>      .withAllowedLateness(Duration.ZERO) .discardingFiredPanes());

>>

>>

>>

>> System.out.println("CountData");

>>

>>

>>

>> PCollection<KV<Long, Long>> countData =

>> readDivideData.apply(Count.perKey());

>>

>>

>>

>> p.run();

>>

>> =====================================================================

>> =========

>>

>>

>>

>> The message of error is:

>>

>> =====================================================================

>> =========

>>

>> Exception in thread "streaming-job-executor-0" java.lang.Error:

>> java.lang.InterruptedException

>>

>>         at

>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.

>> java:1155)

>>

>>         at

>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor

>> .java:624)

>>

>>         at java.lang.Thread.run(Thread.java:748)

>>

>> Caused by: java.lang.InterruptedException

>>

>>         at java.lang.Object.wait(Native Method)

>>

>>         at java.lang.Object.wait(Object.java:502)

>>

>>         at

>> org.apache.spark.scheduler.JobWaiter.awaitResult(JobWaiter.scala:73)

>>

>>         at

>> org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:612

>> )

>>

>>         at

>> org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)

>>

>>         at

>> org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)

>>

>>         at

>> org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)

>>

>>         at

>> org.apache.spark.SparkContext.runJob(SparkContext.scala:1929)

>>

>>         at

>> org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:912)

>>

>>         at

>> org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:910)

>>

>>         at

>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.s

>> cala:150)

>>

>>         at

>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.s

>> cala:111)

>>

>>         at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)

>>

>>         at org.apache.spark.rdd.RDD.foreach(RDD.scala:910)

>>

>> …

>>

>> at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)

>>

>> at

>> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun

>> $run$1.apply$mcV$sp(JobScheduler.scala:224)

>>

>> at

>> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun

>> $run$1.apply(JobScheduler.scala:224)

>>

>> at

>> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun

>> $run$1.apply(JobScheduler.scala:224)

>>

>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)

>>

>> at

>> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobS

>> cheduler.scala:223)

>>

>> at

>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.

>> java:1149)

>>

>> ... 2 more

>>

>> =====================================================================

>> =========

>>

>>

>>

>> Maven 3.5.0, in which related dependencies are listed in my project’s pom.xml:

>>

>> <dependency>

>>

>> <groupId>org.apache.beam</groupId>

>>

>>   <artifactId>beam-sdks-java-core</artifactId>

>>

>>   <version>2.0.0</version>

>>

>> </dependency>

>>

>> <dependency>

>>

>> <groupId>org.apache.beam</groupId>

>>

>>    <artifactId>beam-sdks-java-io-kafka</artifactId>

>>

>>    <version>2.0.0</version>

>>

>> </dependency>

>>

>> <dependency>

>>

>> <groupId>org.apache.spark</groupId>

>>

>>   <artifactId>spark-core_2.10</artifactId>

>>

>>   <version>1.6.0</version>

>>

>> </dependency>

>>

>> <dependency>

>>

>> <groupId>org.apache.spark</groupId>

>>

>>   <artifactId>spark-streaming_2.10</artifactId>

>>

>>   <version>1.6.0</version>

>>

>> </dependency>

>>

>>

>>

>> <dependency>

>>

>> <groupId>org.apache.kafka</groupId>

>>

>>   <artifactId>kafka-clients</artifactId>

>>

>>   <version>0.10.1.1</version>

>>

>> </dependency>

>>

>> <dependency>

>>

>> <groupId>org.apache.kafka</groupId>

>>

>>   <artifactId>kafka_2.10</artifactId>

>>

>>   <version>0.10.1.1</version>

>>

>> </dependency>

>>

>>

>>

>>

>>

>> When I use the above code in Spark Runner (Local [4]), this project worked well (2000~4000 data/s). However, if I run it on Standalone mode, it failed along with the above error.

>>

>>

>>

>> If you have any idea about the error ("streaming-job-executor-0"), I am looking forward to hearing from you.

>>

>>

>>

>> Note that: perform command line is “./spark-submit --class com.itri.beam.kafkatest --master spark:// ubuntu8:7077 /root/BeamKafkaAdvanced-0.1-shaded.jar --runner=SparkRunner”

>>

>>

>>

>> Thanks

>>

>>

>>

>> Rick

>>

>>

>>

>>

>>

>>

>>

>> --

>> 本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。 This email may contain confidential information. Please do not use or disclose it in any way and delete it if you are not the intended recipient.


--
本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。 This email may contain confidential information. Please do not use or disclose it in any way and delete it if you are not the intended recipient.

Re: kafkaIO Run with Spark Runner: "streaming-job-executor-0"

Posted by Ismaël Mejía <ie...@gmail.com>.
Can you please update the version of Beam to at least version 2.2.0.
There were some important fixes in streaming after the 2.0.0 release
so this could be related. Ideally you should use the latest released
version (2.4.0). Remember that starting with Beam 2.3.0 the Spark
runner is based on Spark 2.

On Wed, Jun 13, 2018 at 5:11 PM Raghu Angadi <ra...@google.com> wrote:
>
> Can you check the logs on the worker?
>
> On Wed, Jun 13, 2018 at 2:26 AM <li...@itri.org.tw> wrote:
>>
>> Dear all,
>>
>>
>>
>> I am using the kafkaIO in my project (Beam 2.0.0 with Spark runner).
>>
>> My running environment is:
>>
>> OS: Ubuntn 14.04.3 LTS
>>
>> The different version for these tools:
>>
>> JAVA: JDK 1.8
>>
>> Beam 2.0.0 (Spark runner with Standalone mode)
>>
>> Spark 1.6.0
>>
>> Standalone mode :One driver node: ubuntu7; One master node: ubuntu8; Two worker nodes: ubuntu8 and ubuntu9
>>
>> Kafka: 2.10-0.10.1.1
>>
>>
>>
>> The java code of my project is:
>>
>> ==============================================================================
>>
>> SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
>>
>> options.setRunner(SparkRunner.class);
>>
>> options.setSparkMaster("spark://ubuntu8:7077");
>>
>> options.setAppName("App kafkaBeamTest");
>>
>> options.setJobName("Job kafkaBeamTest");
>>
>> options.setMaxRecordsPerBatch(1000L);
>>
>>
>>
>> Pipeline p = Pipeline.create(options);
>>
>>
>>
>> System.out.println("Beamtokafka");
>>
>> PCollection<KV<Long, String>> readData = p.apply(KafkaIO.<Long, String>read()
>>
>> .withBootstrapServers(ubuntu7:9092)
>>
>> .withTopic("kafkasink")
>>
>> .withKeyDeserializer(LongDeserializer.class)
>>
>> .withValueDeserializer(StringDeserializer.class)
>>
>>        .withoutMetadata()
>>
>>        );
>>
>>
>>
>> PCollection<KV<Long, String>> readDivideData = readData.
>>
>> apply(Window.<KV<Long,String>>into(FixedWindows.of(Duration.standardSeconds(1)))
>>
>>      .triggering(AfterWatermark.pastEndOfWindow()
>>
>>        .withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.ZERO)))
>>
>>      .withAllowedLateness(Duration.ZERO) .discardingFiredPanes());
>>
>>
>>
>> System.out.println("CountData");
>>
>>
>>
>> PCollection<KV<Long, Long>> countData = readDivideData.apply(Count.perKey());
>>
>>
>>
>> p.run();
>>
>> ==============================================================================
>>
>>
>>
>> The message of error is:
>>
>> ==============================================================================
>>
>> Exception in thread "streaming-job-executor-0" java.lang.Error: java.lang.InterruptedException
>>
>>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1155)
>>
>>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>
>>         at java.lang.Thread.run(Thread.java:748)
>>
>> Caused by: java.lang.InterruptedException
>>
>>         at java.lang.Object.wait(Native Method)
>>
>>         at java.lang.Object.wait(Object.java:502)
>>
>>         at org.apache.spark.scheduler.JobWaiter.awaitResult(JobWaiter.scala:73)
>>
>>         at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:612)
>>
>>         at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
>>
>>         at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
>>
>>         at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
>>
>>         at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929)
>>
>>         at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:912)
>>
>>         at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:910)
>>
>>         at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
>>
>>         at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
>>
>>         at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
>>
>>         at org.apache.spark.rdd.RDD.foreach(RDD.scala:910)
>>
>> …
>>
>> at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
>>
>> at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224)
>>
>> at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
>>
>> at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
>>
>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>>
>> at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223)
>>
>> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>
>> ... 2 more
>>
>> ==============================================================================
>>
>>
>>
>> Maven 3.5.0, in which related dependencies are listed in my project’s pom.xml:
>>
>> <dependency>
>>
>> <groupId>org.apache.beam</groupId>
>>
>>   <artifactId>beam-sdks-java-core</artifactId>
>>
>>   <version>2.0.0</version>
>>
>> </dependency>
>>
>> <dependency>
>>
>> <groupId>org.apache.beam</groupId>
>>
>>    <artifactId>beam-sdks-java-io-kafka</artifactId>
>>
>>    <version>2.0.0</version>
>>
>> </dependency>
>>
>> <dependency>
>>
>> <groupId>org.apache.spark</groupId>
>>
>>   <artifactId>spark-core_2.10</artifactId>
>>
>>   <version>1.6.0</version>
>>
>> </dependency>
>>
>> <dependency>
>>
>> <groupId>org.apache.spark</groupId>
>>
>>   <artifactId>spark-streaming_2.10</artifactId>
>>
>>   <version>1.6.0</version>
>>
>> </dependency>
>>
>>
>>
>> <dependency>
>>
>> <groupId>org.apache.kafka</groupId>
>>
>>   <artifactId>kafka-clients</artifactId>
>>
>>   <version>0.10.1.1</version>
>>
>> </dependency>
>>
>> <dependency>
>>
>> <groupId>org.apache.kafka</groupId>
>>
>>   <artifactId>kafka_2.10</artifactId>
>>
>>   <version>0.10.1.1</version>
>>
>> </dependency>
>>
>>
>>
>>
>>
>> When I use the above code in Spark Runner (Local [4]), this project worked well (2000~4000 data/s). However, if I run it on Standalone mode, it failed along with the above error.
>>
>>
>>
>> If you have any idea about the error ("streaming-job-executor-0"), I am looking forward to hearing from you.
>>
>>
>>
>> Note that: perform command line is “./spark-submit --class com.itri.beam.kafkatest --master spark:// ubuntu8:7077 /root/BeamKafkaAdvanced-0.1-shaded.jar --runner=SparkRunner”
>>
>>
>>
>> Thanks
>>
>>
>>
>> Rick
>>
>>
>>
>>
>>
>>
>>
>> --
>> 本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。 This email may contain confidential information. Please do not use or disclose it in any way and delete it if you are not the intended recipient.

Re: kafkaIO Run with Spark Runner: "streaming-job-executor-0"

Posted by Raghu Angadi <ra...@google.com>.
Can you check the logs on the worker?

On Wed, Jun 13, 2018 at 2:26 AM <li...@itri.org.tw> wrote:

> Dear all,
>
>
>
> I am using the kafkaIO in my project (Beam 2.0.0 with Spark runner).
>
> My running environment is:
>
> OS: Ubuntn 14.04.3 LTS
>
> The different version for these tools:
>
> JAVA: JDK 1.8
>
> Beam 2.0.0 (Spark runner with Standalone mode)
>
> Spark 1.6.0
>
> Standalone mode :One driver node: ubuntu7; One master node: ubuntu8; Two
> worker nodes: ubuntu8 and ubuntu9
>
> Kafka: 2.10-0.10.1.1
>
>
>
> The java code of my project is:
>
>
> ==============================================================================
>
> SparkPipelineOptions options = PipelineOptionsFactory.*as*
> (SparkPipelineOptions.*class*);
>
> options.setRunner(SparkRunner.*class*);
>
> options.setSparkMaster("spark://ubuntu8:7077");
>
> options.setAppName("App kafkaBeamTest");
>
> options.setJobName("Job kafkaBeamTest");
>
> *options**.setMaxRecordsPerBatch(1000L);*
>
>
>
> Pipeline p = Pipeline.create(options);
>
>
>
> System.out.println("Beamtokafka");
>
> PCollection<KV<Long, String>> readData = p.apply(KafkaIO.<Long,
> String>read()
>
> .withBootstrapServers(ubuntu7:9092)
>
> .withTopic("kafkasink")
>
> .withKeyDeserializer(LongDeserializer.class)
>
> .withValueDeserializer(StringDeserializer.class)
>
>        .withoutMetadata()
>
>        );
>
>
>
> PCollection<KV<Long, String>> readDivideData = readData.
>
>
> apply(Window.<KV<Long,String>>into(FixedWindows.of(Duration.standardSeconds(1)))
>
>      .triggering(AfterWatermark.pastEndOfWindow()
>
>
> .withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.ZERO)))
>
>      .withAllowedLateness(Duration.ZERO) .discardingFiredPanes());
>
>
>
> System.out.println("CountData");
>
>
>
> PCollection<KV<Long, Long>> countData =
> readDivideData.apply(Count.perKey());
>
>
>
> p.run();
>
>
> ==============================================================================
>
>
>
> The message of error is:
>
>
> ==============================================================================
>
> Exception in thread "streaming-job-executor-0" java.lang.Error:
> java.lang.InterruptedException
>
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1155)
>
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>
>         at java.lang.Thread.run(Thread.java:748)
>
> Caused by: java.lang.InterruptedException
>
>         at java.lang.Object.wait(Native Method)
>
>         at java.lang.Object.wait(Object.java:502)
>
>         at
> org.apache.spark.scheduler.JobWaiter.awaitResult(JobWaiter.scala:73)
>
>         at
> org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:612)
>
>         at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
>
>         at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
>
>         at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
>
>         at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929)
>
>         at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:912)
>
>         at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:910)
>
>         at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
>
>         at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
>
>         at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
>
>         at org.apache.spark.rdd.RDD.foreach(RDD.scala:910)
>
> …
>
> at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
>
> at
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224)
>
> at
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
>
> at
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
>
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>
> at
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223)
>
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>
> ... 2 more
>
>
> ==============================================================================
>
>
>
> Maven 3.5.0, in which related dependencies are listed in my project’s
> pom.xml:
>
> <dependency>
>
> <groupId>org.apache.beam</groupId>
>
>   <artifactId>beam-sdks-java-core</artifactId>
>
>   <version>2.0.0</version>
>
> </dependency>
>
> <dependency>
>
> <groupId>org.apache.beam</groupId>
>
>    <artifactId>beam-sdks-java-io-kafka</artifactId>
>
>    <version>2.0.0</version>
>
> </dependency>
>
> <dependency>
>
> <groupId>org.apache.spark</groupId>
>
>   <artifactId>spark-core_2.10</artifactId>
>
>   <version>1.6.0</version>
>
> </dependency>
>
> <dependency>
>
> <groupId>org.apache.spark</groupId>
>
>   <artifactId>spark-streaming_2.10</artifactId>
>
>   <version>1.6.0</version>
>
> </dependency>
>
>
>
> <dependency>
>
> <groupId>org.apache.kafka</groupId>
>
>   <artifactId>kafka-clients</artifactId>
>
>   <version>0.10.1.1</version>
>
> </dependency>
>
> <dependency>
>
> <groupId>org.apache.kafka</groupId>
>
>   <artifactId>kafka_2.10</artifactId>
>
>   <version>0.10.1.1</version>
>
> </dependency>
>
>
>
>
>
> When I use the above code in Spark Runner (Local [4]), this project worked
> well (2000~4000 data/s). However, if I run it on Standalone mode, it failed
> along with the above error.
>
>
>
> If you have any idea about the error ("streaming-job-executor-0"), I am
> looking forward to hearing from you.
>
>
>
> Note that: perform command line is “./spark-submit --class
> com.itri.beam.kafkatest --master spark:// ubuntu8:7077
> /root/BeamKafkaAdvanced-0.1-shaded.jar --runner=SparkRunner”
>
>
>
> Thanks
>
>
>
> Rick
>
>
>
>
>
>
> --
> 本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。 This email may contain
> confidential information. Please do not use or disclose it in any way and
> delete it if you are not the intended recipient.
>