You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Minreng Wu <wu...@gmail.com> on 2020/09/21 07:07:55 UTC

KafkaIO Pipeline Shut down Immediately with the Spark Runner

Hi Contributors,

I have a very simple pipeline that just reads data from KafkaIO, then
prints the parsed data to the console. Below is the main function of my
program:

> public static void main(String[] args) {
>     // create pipeline
>     PipelineOptions pipelineOption = PipelineOptionsFactory.fromArgs(args)
>             .withoutStrictParsing()
>             .as(PipelineOptions.class);
>
>     Pipeline pipeline = Pipeline.create(pipelineOption);
>
>     // define input schema
>     Schema inputSchema = Schema.builder()
>             .addStringField("camera")
>             .addDateTimeField("event_time")
>             .addInt32Field("car")
>             .addInt32Field("person")
>             .build();
>
>     // generate stream source
>     PCollection<Row> rows = pipeline
>             .apply("read kafka", KafkaIO.<String, String>read()
>                     .withBootstrapServers("127.0.0.1:9092")
>                     .withTopic("beamKafkaTest")
>                     .withKeyDeserializer(StringDeserializer.class)
>                     .withValueDeserializer(StringDeserializer.class)
>                     .withReadCommitted()
>                     .commitOffsetsInFinalize()
>                     .withConsumerConfigUpdates(ImmutableMap.of("group.id", "client-1"))
>                     .withoutMetadata()
>             )
>             // parse JSON
>             .apply("parse JSON", ParDo.of(new DoFn<KV<String, String>, Row>() {
>                 @ProcessElement
>                 public void processElement(ProcessContext c) {
>                     String jsonData = c.element().getValue();
>
>                     // parse json
>                     JSONObject jsonObject = JSON.parseObject(jsonData);
>
>                     // build row
>                     List<Object> list = new ArrayList<>();
>                     list.add(jsonObject.get("camera"));
>                     list.add(dtf.parseDateTime((String) jsonObject.get("event_time")));
>                     list.add(jsonObject.get("car"));
>                     list.add(jsonObject.get("person"));
>                     Row row = Row.withSchema(inputSchema)
>                             .addValues(list)
>                             .build();
>
>                     System.out.println(row);
>
>                     // emit row
>                     c.output(row);
>                 }
>             }))
>             // set input schema
>             .setRowSchema(inputSchema);
>
>     // define output schema
>     Schema outputSchema = Schema.builder()
>             .addStringField("camera")
>             .addDateTimeField("event_time")
>             .addInt32Field("car")
>             .addInt32Field("person")
>             .build();
>
>     // print results
>     rows
>             .apply(
>                     "log_result",
>                     MapElements.via(
>                             new SimpleFunction<Row, Row>() {
>                                 @Override
>                                 public Row apply(Row input) {
>                                     // expect output:
>                                     // RESULT: [row, 5.0]
>                                     System.out.println("RESULT: " + input.getValues());
>                                     return input;
>                                 }
>                             }))
>             .setRowSchema(
>                     outputSchema
>             );
>
>     // run
>     pipeline.run();
> }
>
> This code works well with the direct runner and the Flink runner. But when
it comes to the Spark runner, it shut down immediately after submitted to
the spark cluster without any error messages. As expected it should keep
running and consuming messages until we stop it. But now it just performs
like a one-pass program. And I tested some of the official examples
(WordCount, WindowedWordCount) that goes with the bounded sources(TextIO),
they all worked well with the Spark Runner.

The project structure I used is generated from the maven archetype provided
on this page
<https://beam.apache.org/get-started/quickstart-java/#get-the-wordcount-code>.
The logs of my submitted spark application are attached at the bottom of
this email. Since it printed the consumer configurations, I guess it had
already initialized the Kafka consumer, but failed in the following steps.

So my question is: *Why does my KafkaIO program shut down immediately when
running with the Spark runner, but work well with the direct runner and
flink runner? Is there any existing example of using KafkaIO with the Spark
runner?*

Really appreciate your help and advice! Stay safe and happy!

Many thanks,
Minreng Wu


Top of Log
>
> Spark Executor Command: "/Library/Java/JavaVirtualMachines/jdk1.8.0_211.jdk/Contents/Home/bin/java" "-cp" "/Users/wumrwds/Development/spark-2.4.5-bin-hadoop2.7/conf/:/Users/wumrwds/Development/spark-2.4.5-bin-hadoop2.7/jars/*" "-Xmx1024M" "-Dspark.driver.port=53118" "org.apache.spark.executor.CoarseGrainedExecutorBackend" "--driver-url" "spark://CoarseGrainedScheduler@10.0.10.123:53118" "--executor-id" "0" "--hostname" "10.0.10.123" "--cores" "12" "--app-id" "app-20200921011535-0003" "--worker-url" "spark://Worker@10.0.10.123:53084"
> ========================================
>
> 20/09/21 01:15:32 WARN Utils: Your hostname, Minrengs-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 10.0.10.123 instead (on interface en0)
> 20/09/21 01:15:32 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
> 20/09/21 01:15:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
> log4j:WARN No appenders could be found for logger (org.apache.beam.sdk.options.PipelineOptionsFactory).
> log4j:WARN Please initialize the log4j system properly.
> log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
> Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
> 20/09/21 01:15:35 WARN Checkpoint: Checkpoint directory /tmp/beamsqlapp2-wumrwds-0921061534-60408bcd/spark-checkpoint does not exist
> 20/09/21 01:15:35 INFO SparkRunnerStreamingContextFactory: Creating a new Spark Streaming Context
> 20/09/21 01:15:35 INFO SparkRunnerStreamingContextFactory: Setting Spark streaming batchDuration to 500 msec
> 20/09/21 01:15:35 INFO SparkContextFactory: Creating a brand new Spark Context.
> 20/09/21 01:15:35 INFO SparkContext: Running Spark version 2.4.5
> 20/09/21 01:15:35 INFO SparkContext: Submitted application: BeamSqlApp2
> 20/09/21 01:15:35 INFO SecurityManager: Changing view acls to: wumrwds
> 20/09/21 01:15:35 INFO SecurityManager: Changing modify acls to: wumrwds
> 20/09/21 01:15:35 INFO SecurityManager: Changing view acls groups to:
> 20/09/21 01:15:35 INFO SecurityManager: Changing modify acls groups to:
> 20/09/21 01:15:35 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(wumrwds); groups with view permissions: Set(); users  with modify permissions: Set(wumrwds); groups with modify permissions: Set()
> 20/09/21 01:15:35 INFO Utils: Successfully started service 'sparkDriver' on port 53118.
> 20/09/21 01:15:35 INFO SparkEnv: Registering MapOutputTracker
> 20/09/21 01:15:35 INFO SparkEnv: Registering BlockManagerMaster
> 20/09/21 01:15:35 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
> 20/09/21 01:15:35 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
> 20/09/21 01:15:35 INFO DiskBlockManager: Created local directory at /private/var/folders/kl/98ycp0vj19z082xtn76h07r00000gn/T/blockmgr-e26bb6e9-1c3a-45ee-b176-abb7ca03ae4f
> 20/09/21 01:15:35 INFO MemoryStore: MemoryStore started with capacity 366.3 MB
> 20/09/21 01:15:35 INFO SparkEnv: Registering OutputCommitCoordinator
> 20/09/21 01:15:35 INFO Utils: Successfully started service 'SparkUI' on port 4040.
> 20/09/21 01:15:35 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://10.0.10.123:4040
> 20/09/21 01:15:35 INFO SparkContext: Added JAR file:/Users/wumrwds/Git/play/beam-play/word-count-beam/target/word-count-beam-bundled-0.1.jar at spark://10.0.10.123:53118/jars/word-count-beam-bundled-0.1.jar with timestamp 1600668935676
> 20/09/21 01:15:35 INFO StandaloneAppClient$ClientEndpoint: Connecting to master spark://Minrengs-MacBook-Pro.local:7077...
> 20/09/21 01:15:35 INFO TransportClientFactory: Successfully created connection to Minrengs-MacBook-Pro.local/127.0.0.1:7077 after 26 ms (0 ms spent in bootstraps)
> 20/09/21 01:15:35 INFO StandaloneSchedulerBackend: Connected to Spark cluster with app ID app-20200921011535-0003
> 20/09/21 01:15:35 INFO StandaloneAppClient$ClientEndpoint: Executor added: app-20200921011535-0003/0 on worker-20200921011236-10.0.10.123-53084 (10.0.10.123:53084) with 12 core(s)
> 20/09/21 01:15:35 INFO StandaloneSchedulerBackend: Granted executor ID app-20200921011535-0003/0 on hostPort 10.0.10.123:53084 with 12 core(s), 1024.0 MB RAM
> 20/09/21 01:15:35 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 53120.
> 20/09/21 01:15:35 INFO NettyBlockTransferService: Server created on 10.0.10.123:53120
> 20/09/21 01:15:35 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
> 20/09/21 01:15:35 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 10.0.10.123, 53120, None)
> 20/09/21 01:15:35 INFO StandaloneAppClient$ClientEndpoint: Executor updated: app-20200921011535-0003/0 is now RUNNING
> 20/09/21 01:15:35 INFO BlockManagerMasterEndpoint: Registering block manager 10.0.10.123:53120 with 366.3 MB RAM, BlockManagerId(driver, 10.0.10.123, 53120, None)
> 20/09/21 01:15:35 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 10.0.10.123, 53120, None)
> 20/09/21 01:15:35 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 10.0.10.123, 53120, None)
> 20/09/21 01:15:35 INFO StandaloneSchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.0
> 20/09/21 01:15:35 WARN Checkpoint$CheckpointDir: The specified checkpoint dir /tmp/beamsqlapp2-wumrwds-0921061534-60408bcd does not match a reliable filesystem so in case of failures this job may not recover properly or even at all.
> 20/09/21 01:15:35 INFO Checkpoint$CheckpointDir: Checkpoint dir set to: /tmp/beamsqlapp2-wumrwds-0921061534-60408bcd
> 20/09/21 01:15:36 INFO MetricsAccumulator: No metrics checkpoint found.
> 20/09/21 01:15:36 INFO MetricsAccumulator: Instantiated metrics accumulator: MetricQueryResults()
> 20/09/21 01:15:36 WARN Checkpoint$CheckpointDir: The specified checkpoint dir /tmp/beamsqlapp2-wumrwds-0921061534-60408bcd does not match a reliable filesystem so in case of failures this job may not recover properly or even at all.
> 20/09/21 01:15:36 INFO Checkpoint$CheckpointDir: Checkpoint dir set to: /tmp/beamsqlapp2-wumrwds-0921061534-60408bcd
> 20/09/21 01:15:36 INFO AggregatorsAccumulator: No accumulator checkpoint found.
> 20/09/21 01:15:36 INFO AggregatorsAccumulator: Instantiated aggregators accumulator:
> 20/09/21 01:15:36 INFO SparkRunner$Evaluator: Evaluating Read(KafkaUnboundedSource)
> 20/09/21 01:15:36 INFO PIDRateEstimator: Created PIDRateEstimator with proportional = 1.0, integral = 0.2, derivative = 0.0, min rate = 100.0
> 20/09/21 01:15:36 INFO SourceDStream: Read duration set to: PT0.200S
> 20/09/21 01:15:36 INFO SourceDStream: Max records per batch has not been limited by neither configuration nor the rate controller, and will remain unlimited for the current batch (9223372036854775807).
> 20/09/21 01:15:36 INFO ConsumerConfig: ConsumerConfig values:
> 	allow.auto.create.topics = true
> 	auto.commit.interval.ms = 5000
> 	auto.offset.reset = latest
> 	bootstrap.servers = [10.0.10.123:9092]
> 	check.crcs = true
> 	client.dns.lookup = default
> 	client.id =
> 	client.rack =
> 	connections.max.idle.ms = 540000
> 	default.api.timeout.ms = 60000
> 	enable.auto.commit = false
> 	exclude.internal.topics = true
> 	fetch.max.bytes = 52428800
> 	fetch.max.wait.ms = 500
> 	fetch.min.bytes = 1
> 	group.id = client-1
> 	group.instance.id = null
> 	heartbeat.interval.ms = 3000
> 	interceptor.classes = []
> 	internal.leave.group.on.close = true
> 	isolation.level = read_committed
> 	key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
> 	max.partition.fetch.bytes = 1048576
> 	max.poll.interval.ms = 300000
> 	max.poll.records = 500
> 	metadata.max.age.ms = 300000
> 	metric.reporters = []
> 	metrics.num.samples = 2
> 	metrics.recording.level = INFO
> 	metrics.sample.window.ms = 30000
> 	partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
> 	receive.buffer.bytes = 524288
> 	reconnect.backoff.max.ms = 1000
> 	reconnect.backoff.ms = 50
> 	request.timeout.ms = 30000
> 	retry.backoff.ms = 100
> 	sasl.client.callback.handler.class = null
> 	sasl.jaas.config = null
> 	sasl.kerberos.kinit.cmd = /usr/bin/kinit
> 	sasl.kerberos.min.time.before.relogin = 60000
> 	sasl.kerberos.service.name = null
> 	sasl.kerberos.ticket.renew.jitter = 0.05
> 	sasl.kerberos.ticket.renew.window.factor = 0.8
> 	sasl.login.callback.handler.class = null
> 	sasl.login.class = null
> 	sasl.login.refresh.buffer.seconds = 300
> 	sasl.login.refresh.min.period.seconds = 60
> 	sasl.login.refresh.window.factor = 0.8
> 	sasl.login.refresh.window.jitter = 0.05
> 	sasl.mechanism = GSSAPI
> 	security.protocol = PLAINTEXT
> 	security.providers = null
> 	send.buffer.bytes = 131072
> 	session.timeout.ms = 10000
> 	ssl.cipher.suites = null
> 	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> 	ssl.endpoint.identification.algorithm = https
> 	ssl.key.password = null
> 	ssl.keymanager.algorithm = SunX509
> 	ssl.keystore.location = null
> 	ssl.keystore.password = null
> 	ssl.keystore.type = JKS
> 	ssl.protocol = TLS
> 	ssl.provider = null
> 	ssl.secure.random.implementation = null
> 	ssl.trustmanager.algorithm = PKIX
> 	ssl.truststore.location = null
> 	ssl.truststore.password = null
> 	ssl.truststore.type = JKS
> 	value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
>
> 20/09/21 01:15:36 INFO AppInfoParser: Kafka version: 2.4.1
> 20/09/21 01:15:36 INFO AppInfoParser: Kafka commitId: c57222ae8cd7866b
> 20/09/21 01:15:36 INFO AppInfoParser: Kafka startTimeMs: 1600668936638
> 20/09/21 01:15:37 INFO Metadata: [Consumer clientId=consumer-client-1-1, groupId=client-1] Cluster ID: jYuebrB3RQKaA55bNA1Vsw
> 20/09/21 01:15:37 INFO KafkaUnboundedSource: Partitions assigned to split 0 (total 1): beamKafkaTest-0
> 20/09/21 01:15:37 INFO SparkRunner$Evaluator: Evaluating org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata$1@63718b93
> 20/09/21 01:15:37 INFO SparkRunner$Evaluator: Evaluating org.apache.beam.examples.BeamSqlApp2$1@4930539b
> 20/09/21 01:15:37 INFO SparkRunner$Evaluator: Evaluating org.apache.beam.sdk.transforms.MapElements$1@37b72ea
> 20/09/21 01:15:37 WARN SparkContext: Spark is not running in local mode, therefore the checkpoint directory must not be on the local filesystem. Directory 'file:/tmp/beamsqlapp2-wumrwds-0921061534-60408bcd/spark-checkpoint' appears to be on the local filesystem.
> 20/09/21 01:15:37 INFO SparkRunner: Starting streaming pipeline execution.
> 20/09/21 01:15:37 INFO SourceDStream: Duration for remembering RDDs set to 10000 ms for org.apache.beam.runners.spark.io.SourceDStream@49db1a95
> 20/09/21 01:15:37 INFO SourceDStream: Slide time = 500 ms
> 20/09/21 01:15:37 INFO SourceDStream: Storage level = Serialized 1x Replicated
> 20/09/21 01:15:37 INFO SourceDStream: Checkpoint interval = null
> 20/09/21 01:15:37 INFO SourceDStream: Remember interval = 10000 ms
> 20/09/21 01:15:37 INFO SourceDStream: Initialized and validated org.apache.beam.runners.spark.io.SourceDStream@49db1a95
> 20/09/21 01:15:37 INFO InternalMapWithStateDStream: Slide time = 500 ms
> 20/09/21 01:15:37 INFO InternalMapWithStateDStream: Storage level = Memory Deserialized 1x Replicated
> 20/09/21 01:15:37 INFO InternalMapWithStateDStream: Checkpoint interval = 5000 ms
> 20/09/21 01:15:37 INFO InternalMapWithStateDStream: Remember interval = 10000 ms
> 20/09/21 01:15:37 INFO InternalMapWithStateDStream: Initialized and validated org.apache.spark.streaming.dstream.InternalMapWithStateDStream@5b0151fb
> 20/09/21 01:15:37 INFO MapWithStateDStreamImpl: Slide time = 500 ms
> 20/09/21 01:15:37 INFO MapWithStateDStreamImpl: Storage level = Serialized 1x Replicated
> 20/09/21 01:15:37 INFO MapWithStateDStreamImpl: Checkpoint interval = null
> 20/09/21 01:15:37 INFO MapWithStateDStreamImpl: Remember interval = 500 ms
> 20/09/21 01:15:37 INFO MapWithStateDStreamImpl: Initialized and validated org.apache.spark.streaming.dstream.MapWithStateDStreamImpl@6ff117df
> 20/09/21 01:15:37 INFO MappedDStream: Slide time = 500 ms
> 20/09/21 01:15:37 INFO MappedDStream: Storage level = Serialized 1x Replicated
> 20/09/21 01:15:37 INFO MappedDStream: Checkpoint interval = null
> 20/09/21 01:15:37 INFO MappedDStream: Remember interval = 500 ms
> 20/09/21 01:15:37 INFO MappedDStream: Initialized and validated org.apache.spark.streaming.dstream.MappedDStream@6275db71
> 20/09/21 01:15:37 INFO SparkUnboundedSource$ReadReportDStream: Slide time = 500 ms
> 20/09/21 01:15:37 INFO SparkUnboundedSource$ReadReportDStream: Storage level = Serialized 1x Replicated
> 20/09/21 01:15:37 INFO SparkUnboundedSource$ReadReportDStream: Checkpoint interval = null
> 20/09/21 01:15:37 INFO SparkUnboundedSource$ReadReportDStream: Remember interval = 500 ms
> 20/09/21 01:15:37 INFO SparkUnboundedSource$ReadReportDStream: Initialized and validated org.apache.beam.runners.spark.io.SparkUnboundedSource$ReadReportDStream@3ef0d3c5
> 20/09/21 01:15:37 INFO SourceDStream: Slide time = 500 ms
> 20/09/21 01:15:37 INFO SourceDStream: Storage level = Serialized 1x Replicated
> 20/09/21 01:15:37 INFO SourceDStream: Checkpoint interval = null
> 20/09/21 01:15:37 INFO SourceDStream: Remember interval = 10000 ms
> 20/09/21 01:15:37 INFO SourceDStream: Initialized and validated org.apache.beam.runners.spark.io.SourceDStream@49db1a95
> 20/09/21 01:15:37 INFO InternalMapWithStateDStream: Slide time = 500 ms
> 20/09/21 01:15:37 INFO InternalMapWithStateDStream: Storage level = Memory Deserialized 1x Replicated
> 20/09/21 01:15:37 INFO InternalMapWithStateDStream: Checkpoint interval = 5000 ms
> 20/09/21 01:15:37 INFO InternalMapWithStateDStream: Remember interval = 10000 ms
> 20/09/21 01:15:37 INFO InternalMapWithStateDStream: Initialized and validated org.apache.spark.streaming.dstream.InternalMapWithStateDStream@5b0151fb
> 20/09/21 01:15:37 INFO MapWithStateDStreamImpl: Slide time = 500 ms
> 20/09/21 01:15:37 INFO MapWithStateDStreamImpl: Storage level = Serialized 1x Replicated
> 20/09/21 01:15:37 INFO MapWithStateDStreamImpl: Checkpoint interval = null
> 20/09/21 01:15:37 INFO MapWithStateDStreamImpl: Remember interval = 500 ms
> 20/09/21 01:15:37 INFO MapWithStateDStreamImpl: Initialized and validated org.apache.spark.streaming.dstream.MapWithStateDStreamImpl@6ff117df
> 20/09/21 01:15:37 INFO FlatMappedDStream: Slide time = 500 ms
> 20/09/21 01:15:37 INFO FlatMappedDStream: Storage level = Serialized 1x Replicated
> 20/09/21 01:15:37 INFO FlatMappedDStream: Checkpoint interval = null
> 20/09/21 01:15:37 INFO FlatMappedDStream: Remember interval = 500 ms
> 20/09/21 01:15:37 INFO FlatMappedDStream: Initialized and validated org.apache.spark.streaming.dstream.FlatMappedDStream@4c293f9c
> 20/09/21 01:15:37 INFO MappedDStream: Slide time = 500 ms
> 20/09/21 01:15:37 INFO MappedDStream: Storage level = Serialized 1x Replicated
> 20/09/21 01:15:37 INFO MappedDStream: Checkpoint interval = null
> 20/09/21 01:15:37 INFO MappedDStream: Remember interval = 500 ms
> 20/09/21 01:15:37 INFO MappedDStream: Initialized and validated org.apache.spark.streaming.dstream.MappedDStream@3430988a
> 20/09/21 01:15:37 INFO TransformedDStream: Slide time = 500 ms
> 20/09/21 01:15:37 INFO TransformedDStream: Storage level = Serialized 1x Replicated
> 20/09/21 01:15:37 INFO TransformedDStream: Checkpoint interval = null
> 20/09/21 01:15:37 INFO TransformedDStream: Remember interval = 500 ms
> 20/09/21 01:15:37 INFO TransformedDStream: Initialized and validated org.apache.spark.streaming.dstream.TransformedDStream@17ab8c32
> 20/09/21 01:15:37 INFO FilteredDStream: Slide time = 500 ms
> 20/09/21 01:15:37 INFO FilteredDStream: Storage level = Serialized 1x Replicated
> 20/09/21 01:15:37 INFO FilteredDStream: Checkpoint interval = null
> 20/09/21 01:15:37 INFO FilteredDStream: Remember interval = 500 ms
> 20/09/21 01:15:37 INFO FilteredDStream: Initialized and validated org.apache.spark.streaming.dstream.FilteredDStream@14660807
> 20/09/21 01:15:37 INFO MappedDStream: Slide time = 500 ms
> 20/09/21 01:15:37 INFO MappedDStream: Storage level = Serialized 1x Replicated
> 20/09/21 01:15:37 INFO MappedDStream: Checkpoint interval = null
> 20/09/21 01:15:37 INFO MappedDStream: Remember interval = 500 ms
> 20/09/21 01:15:37 INFO MappedDStream: Initialized and validated org.apache.spark.streaming.dstream.MappedDStream@8f826ef
> 20/09/21 01:15:37 INFO TransformedDStream: Slide time = 500 ms
> 20/09/21 01:15:37 INFO TransformedDStream: Storage level = Serialized 1x Replicated
> 20/09/21 01:15:37 INFO TransformedDStream: Checkpoint interval = null
> 20/09/21 01:15:37 INFO TransformedDStream: Remember interval = 500 ms
> 20/09/21 01:15:37 INFO TransformedDStream: Initialized and validated org.apache.spark.streaming.dstream.TransformedDStream@7e734051
> 20/09/21 01:15:37 INFO FilteredDStream: Slide time = 500 ms
> 20/09/21 01:15:37 INFO FilteredDStream: Storage level = Serialized 1x Replicated
> 20/09/21 01:15:37 INFO FilteredDStream: Checkpoint interval = null
> 20/09/21 01:15:37 INFO FilteredDStream: Remember interval = 500 ms
> 20/09/21 01:15:37 INFO FilteredDStream: Initialized and validated org.apache.spark.streaming.dstream.FilteredDStream@1be77a7
> 20/09/21 01:15:37 INFO MappedDStream: Slide time = 500 ms
> 20/09/21 01:15:37 INFO MappedDStream: Storage level = Serialized 1x Replicated
> 20/09/21 01:15:37 INFO MappedDStream: Checkpoint interval = null
> 20/09/21 01:15:37 INFO MappedDStream: Remember interval = 500 ms
> 20/09/21 01:15:37 INFO MappedDStream: Initialized and validated org.apache.spark.streaming.dstream.MappedDStream@25bae116
> 20/09/21 01:15:37 INFO TransformedDStream: Slide time = 500 ms
> 20/09/21 01:15:37 INFO TransformedDStream: Storage level = Serialized 1x Replicated
> 20/09/21 01:15:37 INFO TransformedDStream: Checkpoint interval = null
> 20/09/21 01:15:37 INFO TransformedDStream: Remember interval = 500 ms
> 20/09/21 01:15:37 INFO TransformedDStream: Initialized and validated org.apache.spark.streaming.dstream.TransformedDStream@6e41c8a5
> 20/09/21 01:15:37 INFO FilteredDStream: Slide time = 500 ms
> 20/09/21 01:15:37 INFO FilteredDStream: Storage level = Serialized 1x Replicated
> 20/09/21 01:15:37 INFO FilteredDStream: Checkpoint interval = null
> 20/09/21 01:15:37 INFO FilteredDStream: Remember interval = 500 ms
> 20/09/21 01:15:37 INFO FilteredDStream: Initialized and validated org.apache.spark.streaming.dstream.FilteredDStream@362e3875
> 20/09/21 01:15:37 INFO MappedDStream: Slide time = 500 ms
> 20/09/21 01:15:37 INFO MappedDStream: Storage level = Serialized 1x Replicated
> 20/09/21 01:15:37 INFO MappedDStream: Checkpoint interval = null
> 20/09/21 01:15:37 INFO MappedDStream: Remember interval = 500 ms
> 20/09/21 01:15:37 INFO MappedDStream: Initialized and validated org.apache.spark.streaming.dstream.MappedDStream@22aaf27c
> 20/09/21 01:15:37 INFO ForEachDStream: Slide time = 500 ms
> 20/09/21 01:15:37 INFO ForEachDStream: Storage level = Serialized 1x Replicated
> 20/09/21 01:15:37 INFO ForEachDStream: Checkpoint interval = null
> 20/09/21 01:15:37 INFO ForEachDStream: Remember interval = 500 ms
> 20/09/21 01:15:37 INFO ForEachDStream: Initialized and validated org.apache.spark.streaming.dstream.ForEachDStream@3bd7e62b
> 20/09/21 01:15:37 INFO RecurringTimer: Started timer for JobGenerator at time 1600668938000
> 20/09/21 01:15:37 INFO JobGenerator: Started JobGenerator at 1600668938000 ms
> 20/09/21 01:15:37 INFO JobScheduler: Started JobScheduler
> 20/09/21 01:15:37 INFO StreamingContext: StreamingContext started
> 20/09/21 01:15:37 INFO StreamingContext: Invoking stop(stopGracefully=false) from shutdown hook
> 20/09/21 01:15:37 INFO BatchedWriteAheadLog: BatchedWriteAheadLog shutting down at time: 1600668937717.
> 20/09/21 01:15:37 WARN BatchedWriteAheadLog: BatchedWriteAheadLog Writer queue interrupted.
> 20/09/21 01:15:37 INFO BatchedWriteAheadLog: BatchedWriteAheadLog Writer thread exiting.
> 20/09/21 01:15:37 INFO FileBasedWriteAheadLog_ReceivedBlockTracker: Stopped write ahead log manager
> 20/09/21 01:15:37 INFO ReceiverTracker: ReceiverTracker stopped
> 20/09/21 01:15:37 INFO JobGenerator: Stopping JobGenerator immediately
> 20/09/21 01:15:37 INFO RecurringTimer: Stopped timer for JobGenerator after time -1
> 20/09/21 01:15:37 INFO CheckpointWriter: CheckpointWriter executor terminated? true, waited for 0 ms.
> 20/09/21 01:15:37 INFO JobGenerator: Stopped JobGenerator
> 20/09/21 01:15:37 INFO JobScheduler: Stopped JobScheduler
> 20/09/21 01:15:37 INFO StreamingContext: StreamingContext stopped successfully
> 20/09/21 01:15:37 INFO SparkContext: Invoking stop() from shutdown hook
> 20/09/21 01:15:37 INFO SparkUI: Stopped Spark web UI at http://10.0.10.123:4040
> 20/09/21 01:15:37 INFO StandaloneSchedulerBackend: Shutting down all executors
> 20/09/21 01:15:37 INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Asking each executor to shut down
> 20/09/21 01:15:37 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
> 20/09/21 01:15:37 INFO MemoryStore: MemoryStore cleared
> 20/09/21 01:15:37 INFO BlockManager: BlockManager stopped
> 20/09/21 01:15:37 INFO BlockManagerMaster: BlockManagerMaster stopped
> 20/09/21 01:15:37 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
> 20/09/21 01:15:37 INFO SparkContext: Successfully stopped SparkContext
> 20/09/21 01:15:37 INFO ShutdownHookManager: Shutdown hook called
> 20/09/21 01:15:37 INFO ShutdownHookManager: Deleting directory /private/var/folders/kl/98ycp0vj19z082xtn76h07r00000gn/T/spark-fac2b91e-8bfc-4855-b0f5-bbb206bb678b
> 20/09/21 01:15:37 INFO ShutdownHookManager: Deleting directory /private/var/folders/kl/98ycp0vj19z082xtn76h07r00000gn/T/spark-e3447ff1-ca22-4574-9864-834fac63eea3
>
>

Re: KafkaIO Pipeline Shut down Immediately with the Spark Runner

Posted by Minreng Wu <wu...@gmail.com>.
Hi all,

I just found the reason why my application shut down immediately. It's
because I didn't call the `.waitUntilFinish()` method when I run the
pipeline.

Sorry for bothering you all due to such a stupid mistake! Thanks a lot!
Have a nice week!

Many thanks,
Minreng

On Mon, Sep 21, 2020 at 2:07 AM Minreng Wu <wu...@gmail.com> wrote:

> Hi Contributors,
>
> I have a very simple pipeline that just reads data from KafkaIO, then
> prints the parsed data to the console. Below is the main function of my
> program:
>
>> public static void main(String[] args) {
>>     // create pipeline
>>     PipelineOptions pipelineOption = PipelineOptionsFactory.fromArgs(args)
>>             .withoutStrictParsing()
>>             .as(PipelineOptions.class);
>>
>>     Pipeline pipeline = Pipeline.create(pipelineOption);
>>
>>     // define input schema
>>     Schema inputSchema = Schema.builder()
>>             .addStringField("camera")
>>             .addDateTimeField("event_time")
>>             .addInt32Field("car")
>>             .addInt32Field("person")
>>             .build();
>>
>>     // generate stream source
>>     PCollection<Row> rows = pipeline
>>             .apply("read kafka", KafkaIO.<String, String>read()
>>                     .withBootstrapServers("127.0.0.1:9092")
>>                     .withTopic("beamKafkaTest")
>>                     .withKeyDeserializer(StringDeserializer.class)
>>                     .withValueDeserializer(StringDeserializer.class)
>>                     .withReadCommitted()
>>                     .commitOffsetsInFinalize()
>>                     .withConsumerConfigUpdates(ImmutableMap.of("group.id", "client-1"))
>>                     .withoutMetadata()
>>             )
>>             // parse JSON
>>             .apply("parse JSON", ParDo.of(new DoFn<KV<String, String>, Row>() {
>>                 @ProcessElement
>>                 public void processElement(ProcessContext c) {
>>                     String jsonData = c.element().getValue();
>>
>>                     // parse json
>>                     JSONObject jsonObject = JSON.parseObject(jsonData);
>>
>>                     // build row
>>                     List<Object> list = new ArrayList<>();
>>                     list.add(jsonObject.get("camera"));
>>                     list.add(dtf.parseDateTime((String) jsonObject.get("event_time")));
>>                     list.add(jsonObject.get("car"));
>>                     list.add(jsonObject.get("person"));
>>                     Row row = Row.withSchema(inputSchema)
>>                             .addValues(list)
>>                             .build();
>>
>>                     System.out.println(row);
>>
>>                     // emit row
>>                     c.output(row);
>>                 }
>>             }))
>>             // set input schema
>>             .setRowSchema(inputSchema);
>>
>>     // define output schema
>>     Schema outputSchema = Schema.builder()
>>             .addStringField("camera")
>>             .addDateTimeField("event_time")
>>             .addInt32Field("car")
>>             .addInt32Field("person")
>>             .build();
>>
>>     // print results
>>     rows
>>             .apply(
>>                     "log_result",
>>                     MapElements.via(
>>                             new SimpleFunction<Row, Row>() {
>>                                 @Override
>>                                 public Row apply(Row input) {
>>                                     // expect output:
>>                                     // RESULT: [row, 5.0]
>>                                     System.out.println("RESULT: " + input.getValues());
>>                                     return input;
>>                                 }
>>                             }))
>>             .setRowSchema(
>>                     outputSchema
>>             );
>>
>>     // run
>>     pipeline.run();
>> }
>>
>> This code works well with the direct runner and the Flink runner. But
> when it comes to the Spark runner, it shut down immediately after submitted
> to the spark cluster without any error messages. As expected it should keep
> running and consuming messages until we stop it. But now it just performs
> like a one-pass program. And I tested some of the official examples
> (WordCount, WindowedWordCount) that goes with the bounded sources(TextIO),
> they all worked well with the Spark Runner.
>
> The project structure I used is generated from the maven archetype
> provided on this page
> <https://beam.apache.org/get-started/quickstart-java/#get-the-wordcount-code>.
> The logs of my submitted spark application are attached at the bottom of
> this email. Since it printed the consumer configurations, I guess it had
> already initialized the Kafka consumer, but failed in the following steps.
>
> So my question is: *Why does my KafkaIO program shut down immediately
> when running with the Spark runner, but work well with the direct runner
> and flink runner? Is there any existing example of using KafkaIO with the
> Spark runner?*
>
> Really appreciate your help and advice! Stay safe and happy!
>
> Many thanks,
> Minreng Wu
>
>
> Top of Log
>>
>> Spark Executor Command: "/Library/Java/JavaVirtualMachines/jdk1.8.0_211.jdk/Contents/Home/bin/java" "-cp" "/Users/wumrwds/Development/spark-2.4.5-bin-hadoop2.7/conf/:/Users/wumrwds/Development/spark-2.4.5-bin-hadoop2.7/jars/*" "-Xmx1024M" "-Dspark.driver.port=53118" "org.apache.spark.executor.CoarseGrainedExecutorBackend" "--driver-url" "spark://CoarseGrainedScheduler@10.0.10.123:53118" "--executor-id" "0" "--hostname" "10.0.10.123" "--cores" "12" "--app-id" "app-20200921011535-0003" "--worker-url" "spark://Worker@10.0.10.123:53084"
>> ========================================
>>
>> 20/09/21 01:15:32 WARN Utils: Your hostname, Minrengs-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 10.0.10.123 instead (on interface en0)
>> 20/09/21 01:15:32 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
>> 20/09/21 01:15:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
>> log4j:WARN No appenders could be found for logger (org.apache.beam.sdk.options.PipelineOptionsFactory).
>> log4j:WARN Please initialize the log4j system properly.
>> log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
>> Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
>> 20/09/21 01:15:35 WARN Checkpoint: Checkpoint directory /tmp/beamsqlapp2-wumrwds-0921061534-60408bcd/spark-checkpoint does not exist
>> 20/09/21 01:15:35 INFO SparkRunnerStreamingContextFactory: Creating a new Spark Streaming Context
>> 20/09/21 01:15:35 INFO SparkRunnerStreamingContextFactory: Setting Spark streaming batchDuration to 500 msec
>> 20/09/21 01:15:35 INFO SparkContextFactory: Creating a brand new Spark Context.
>> 20/09/21 01:15:35 INFO SparkContext: Running Spark version 2.4.5
>> 20/09/21 01:15:35 INFO SparkContext: Submitted application: BeamSqlApp2
>> 20/09/21 01:15:35 INFO SecurityManager: Changing view acls to: wumrwds
>> 20/09/21 01:15:35 INFO SecurityManager: Changing modify acls to: wumrwds
>> 20/09/21 01:15:35 INFO SecurityManager: Changing view acls groups to:
>> 20/09/21 01:15:35 INFO SecurityManager: Changing modify acls groups to:
>> 20/09/21 01:15:35 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(wumrwds); groups with view permissions: Set(); users  with modify permissions: Set(wumrwds); groups with modify permissions: Set()
>> 20/09/21 01:15:35 INFO Utils: Successfully started service 'sparkDriver' on port 53118.
>> 20/09/21 01:15:35 INFO SparkEnv: Registering MapOutputTracker
>> 20/09/21 01:15:35 INFO SparkEnv: Registering BlockManagerMaster
>> 20/09/21 01:15:35 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
>> 20/09/21 01:15:35 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
>> 20/09/21 01:15:35 INFO DiskBlockManager: Created local directory at /private/var/folders/kl/98ycp0vj19z082xtn76h07r00000gn/T/blockmgr-e26bb6e9-1c3a-45ee-b176-abb7ca03ae4f
>> 20/09/21 01:15:35 INFO MemoryStore: MemoryStore started with capacity 366.3 MB
>> 20/09/21 01:15:35 INFO SparkEnv: Registering OutputCommitCoordinator
>> 20/09/21 01:15:35 INFO Utils: Successfully started service 'SparkUI' on port 4040.
>> 20/09/21 01:15:35 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://10.0.10.123:4040
>> 20/09/21 01:15:35 INFO SparkContext: Added JAR file:/Users/wumrwds/Git/play/beam-play/word-count-beam/target/word-count-beam-bundled-0.1.jar at spark://10.0.10.123:53118/jars/word-count-beam-bundled-0.1.jar with timestamp 1600668935676
>> 20/09/21 01:15:35 INFO StandaloneAppClient$ClientEndpoint: Connecting to master spark://Minrengs-MacBook-Pro.local:7077...
>> 20/09/21 01:15:35 INFO TransportClientFactory: Successfully created connection to Minrengs-MacBook-Pro.local/127.0.0.1:7077 after 26 ms (0 ms spent in bootstraps)
>> 20/09/21 01:15:35 INFO StandaloneSchedulerBackend: Connected to Spark cluster with app ID app-20200921011535-0003
>> 20/09/21 01:15:35 INFO StandaloneAppClient$ClientEndpoint: Executor added: app-20200921011535-0003/0 on worker-20200921011236-10.0.10.123-53084 (10.0.10.123:53084) with 12 core(s)
>> 20/09/21 01:15:35 INFO StandaloneSchedulerBackend: Granted executor ID app-20200921011535-0003/0 on hostPort 10.0.10.123:53084 with 12 core(s), 1024.0 MB RAM
>> 20/09/21 01:15:35 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 53120.
>> 20/09/21 01:15:35 INFO NettyBlockTransferService: Server created on 10.0.10.123:53120
>> 20/09/21 01:15:35 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
>> 20/09/21 01:15:35 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 10.0.10.123, 53120, None)
>> 20/09/21 01:15:35 INFO StandaloneAppClient$ClientEndpoint: Executor updated: app-20200921011535-0003/0 is now RUNNING
>> 20/09/21 01:15:35 INFO BlockManagerMasterEndpoint: Registering block manager 10.0.10.123:53120 with 366.3 MB RAM, BlockManagerId(driver, 10.0.10.123, 53120, None)
>> 20/09/21 01:15:35 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 10.0.10.123, 53120, None)
>> 20/09/21 01:15:35 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 10.0.10.123, 53120, None)
>> 20/09/21 01:15:35 INFO StandaloneSchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.0
>> 20/09/21 01:15:35 WARN Checkpoint$CheckpointDir: The specified checkpoint dir /tmp/beamsqlapp2-wumrwds-0921061534-60408bcd does not match a reliable filesystem so in case of failures this job may not recover properly or even at all.
>> 20/09/21 01:15:35 INFO Checkpoint$CheckpointDir: Checkpoint dir set to: /tmp/beamsqlapp2-wumrwds-0921061534-60408bcd
>> 20/09/21 01:15:36 INFO MetricsAccumulator: No metrics checkpoint found.
>> 20/09/21 01:15:36 INFO MetricsAccumulator: Instantiated metrics accumulator: MetricQueryResults()
>> 20/09/21 01:15:36 WARN Checkpoint$CheckpointDir: The specified checkpoint dir /tmp/beamsqlapp2-wumrwds-0921061534-60408bcd does not match a reliable filesystem so in case of failures this job may not recover properly or even at all.
>> 20/09/21 01:15:36 INFO Checkpoint$CheckpointDir: Checkpoint dir set to: /tmp/beamsqlapp2-wumrwds-0921061534-60408bcd
>> 20/09/21 01:15:36 INFO AggregatorsAccumulator: No accumulator checkpoint found.
>> 20/09/21 01:15:36 INFO AggregatorsAccumulator: Instantiated aggregators accumulator:
>> 20/09/21 01:15:36 INFO SparkRunner$Evaluator: Evaluating Read(KafkaUnboundedSource)
>> 20/09/21 01:15:36 INFO PIDRateEstimator: Created PIDRateEstimator with proportional = 1.0, integral = 0.2, derivative = 0.0, min rate = 100.0
>> 20/09/21 01:15:36 INFO SourceDStream: Read duration set to: PT0.200S
>> 20/09/21 01:15:36 INFO SourceDStream: Max records per batch has not been limited by neither configuration nor the rate controller, and will remain unlimited for the current batch (9223372036854775807).
>> 20/09/21 01:15:36 INFO ConsumerConfig: ConsumerConfig values:
>> 	allow.auto.create.topics = true
>> 	auto.commit.interval.ms = 5000
>> 	auto.offset.reset = latest
>> 	bootstrap.servers = [10.0.10.123:9092]
>> 	check.crcs = true
>> 	client.dns.lookup = default
>> 	client.id =
>> 	client.rack =
>> 	connections.max.idle.ms = 540000
>> 	default.api.timeout.ms = 60000
>> 	enable.auto.commit = false
>> 	exclude.internal.topics = true
>> 	fetch.max.bytes = 52428800
>> 	fetch.max.wait.ms = 500
>> 	fetch.min.bytes = 1
>> 	group.id = client-1
>> 	group.instance.id = null
>> 	heartbeat.interval.ms = 3000
>> 	interceptor.classes = []
>> 	internal.leave.group.on.close = true
>> 	isolation.level = read_committed
>> 	key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
>> 	max.partition.fetch.bytes = 1048576
>> 	max.poll.interval.ms = 300000
>> 	max.poll.records = 500
>> 	metadata.max.age.ms = 300000
>> 	metric.reporters = []
>> 	metrics.num.samples = 2
>> 	metrics.recording.level = INFO
>> 	metrics.sample.window.ms = 30000
>> 	partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
>> 	receive.buffer.bytes = 524288
>> 	reconnect.backoff.max.ms = 1000
>> 	reconnect.backoff.ms = 50
>> 	request.timeout.ms = 30000
>> 	retry.backoff.ms = 100
>> 	sasl.client.callback.handler.class = null
>> 	sasl.jaas.config = null
>> 	sasl.kerberos.kinit.cmd = /usr/bin/kinit
>> 	sasl.kerberos.min.time.before.relogin = 60000
>> 	sasl.kerberos.service.name = null
>> 	sasl.kerberos.ticket.renew.jitter = 0.05
>> 	sasl.kerberos.ticket.renew.window.factor = 0.8
>> 	sasl.login.callback.handler.class = null
>> 	sasl.login.class = null
>> 	sasl.login.refresh.buffer.seconds = 300
>> 	sasl.login.refresh.min.period.seconds = 60
>> 	sasl.login.refresh.window.factor = 0.8
>> 	sasl.login.refresh.window.jitter = 0.05
>> 	sasl.mechanism = GSSAPI
>> 	security.protocol = PLAINTEXT
>> 	security.providers = null
>> 	send.buffer.bytes = 131072
>> 	session.timeout.ms = 10000
>> 	ssl.cipher.suites = null
>> 	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>> 	ssl.endpoint.identification.algorithm = https
>> 	ssl.key.password = null
>> 	ssl.keymanager.algorithm = SunX509
>> 	ssl.keystore.location = null
>> 	ssl.keystore.password = null
>> 	ssl.keystore.type = JKS
>> 	ssl.protocol = TLS
>> 	ssl.provider = null
>> 	ssl.secure.random.implementation = null
>> 	ssl.trustmanager.algorithm = PKIX
>> 	ssl.truststore.location = null
>> 	ssl.truststore.password = null
>> 	ssl.truststore.type = JKS
>> 	value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
>>
>> 20/09/21 01:15:36 INFO AppInfoParser: Kafka version: 2.4.1
>> 20/09/21 01:15:36 INFO AppInfoParser: Kafka commitId: c57222ae8cd7866b
>> 20/09/21 01:15:36 INFO AppInfoParser: Kafka startTimeMs: 1600668936638
>> 20/09/21 01:15:37 INFO Metadata: [Consumer clientId=consumer-client-1-1, groupId=client-1] Cluster ID: jYuebrB3RQKaA55bNA1Vsw
>> 20/09/21 01:15:37 INFO KafkaUnboundedSource: Partitions assigned to split 0 (total 1): beamKafkaTest-0
>> 20/09/21 01:15:37 INFO SparkRunner$Evaluator: Evaluating org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata$1@63718b93
>> 20/09/21 01:15:37 INFO SparkRunner$Evaluator: Evaluating org.apache.beam.examples.BeamSqlApp2$1@4930539b
>> 20/09/21 01:15:37 INFO SparkRunner$Evaluator: Evaluating org.apache.beam.sdk.transforms.MapElements$1@37b72ea
>> 20/09/21 01:15:37 WARN SparkContext: Spark is not running in local mode, therefore the checkpoint directory must not be on the local filesystem. Directory 'file:/tmp/beamsqlapp2-wumrwds-0921061534-60408bcd/spark-checkpoint' appears to be on the local filesystem.
>> 20/09/21 01:15:37 INFO SparkRunner: Starting streaming pipeline execution.
>> 20/09/21 01:15:37 INFO SourceDStream: Duration for remembering RDDs set to 10000 ms for org.apache.beam.runners.spark.io.SourceDStream@49db1a95
>> 20/09/21 01:15:37 INFO SourceDStream: Slide time = 500 ms
>> 20/09/21 01:15:37 INFO SourceDStream: Storage level = Serialized 1x Replicated
>> 20/09/21 01:15:37 INFO SourceDStream: Checkpoint interval = null
>> 20/09/21 01:15:37 INFO SourceDStream: Remember interval = 10000 ms
>> 20/09/21 01:15:37 INFO SourceDStream: Initialized and validated org.apache.beam.runners.spark.io.SourceDStream@49db1a95
>> 20/09/21 01:15:37 INFO InternalMapWithStateDStream: Slide time = 500 ms
>> 20/09/21 01:15:37 INFO InternalMapWithStateDStream: Storage level = Memory Deserialized 1x Replicated
>> 20/09/21 01:15:37 INFO InternalMapWithStateDStream: Checkpoint interval = 5000 ms
>> 20/09/21 01:15:37 INFO InternalMapWithStateDStream: Remember interval = 10000 ms
>> 20/09/21 01:15:37 INFO InternalMapWithStateDStream: Initialized and validated org.apache.spark.streaming.dstream.InternalMapWithStateDStream@5b0151fb
>> 20/09/21 01:15:37 INFO MapWithStateDStreamImpl: Slide time = 500 ms
>> 20/09/21 01:15:37 INFO MapWithStateDStreamImpl: Storage level = Serialized 1x Replicated
>> 20/09/21 01:15:37 INFO MapWithStateDStreamImpl: Checkpoint interval = null
>> 20/09/21 01:15:37 INFO MapWithStateDStreamImpl: Remember interval = 500 ms
>> 20/09/21 01:15:37 INFO MapWithStateDStreamImpl: Initialized and validated org.apache.spark.streaming.dstream.MapWithStateDStreamImpl@6ff117df
>> 20/09/21 01:15:37 INFO MappedDStream: Slide time = 500 ms
>> 20/09/21 01:15:37 INFO MappedDStream: Storage level = Serialized 1x Replicated
>> 20/09/21 01:15:37 INFO MappedDStream: Checkpoint interval = null
>> 20/09/21 01:15:37 INFO MappedDStream: Remember interval = 500 ms
>> 20/09/21 01:15:37 INFO MappedDStream: Initialized and validated org.apache.spark.streaming.dstream.MappedDStream@6275db71
>> 20/09/21 01:15:37 INFO SparkUnboundedSource$ReadReportDStream: Slide time = 500 ms
>> 20/09/21 01:15:37 INFO SparkUnboundedSource$ReadReportDStream: Storage level = Serialized 1x Replicated
>> 20/09/21 01:15:37 INFO SparkUnboundedSource$ReadReportDStream: Checkpoint interval = null
>> 20/09/21 01:15:37 INFO SparkUnboundedSource$ReadReportDStream: Remember interval = 500 ms
>> 20/09/21 01:15:37 INFO SparkUnboundedSource$ReadReportDStream: Initialized and validated org.apache.beam.runners.spark.io.SparkUnboundedSource$ReadReportDStream@3ef0d3c5
>> 20/09/21 01:15:37 INFO SourceDStream: Slide time = 500 ms
>> 20/09/21 01:15:37 INFO SourceDStream: Storage level = Serialized 1x Replicated
>> 20/09/21 01:15:37 INFO SourceDStream: Checkpoint interval = null
>> 20/09/21 01:15:37 INFO SourceDStream: Remember interval = 10000 ms
>> 20/09/21 01:15:37 INFO SourceDStream: Initialized and validated org.apache.beam.runners.spark.io.SourceDStream@49db1a95
>> 20/09/21 01:15:37 INFO InternalMapWithStateDStream: Slide time = 500 ms
>> 20/09/21 01:15:37 INFO InternalMapWithStateDStream: Storage level = Memory Deserialized 1x Replicated
>> 20/09/21 01:15:37 INFO InternalMapWithStateDStream: Checkpoint interval = 5000 ms
>> 20/09/21 01:15:37 INFO InternalMapWithStateDStream: Remember interval = 10000 ms
>> 20/09/21 01:15:37 INFO InternalMapWithStateDStream: Initialized and validated org.apache.spark.streaming.dstream.InternalMapWithStateDStream@5b0151fb
>> 20/09/21 01:15:37 INFO MapWithStateDStreamImpl: Slide time = 500 ms
>> 20/09/21 01:15:37 INFO MapWithStateDStreamImpl: Storage level = Serialized 1x Replicated
>> 20/09/21 01:15:37 INFO MapWithStateDStreamImpl: Checkpoint interval = null
>> 20/09/21 01:15:37 INFO MapWithStateDStreamImpl: Remember interval = 500 ms
>> 20/09/21 01:15:37 INFO MapWithStateDStreamImpl: Initialized and validated org.apache.spark.streaming.dstream.MapWithStateDStreamImpl@6ff117df
>> 20/09/21 01:15:37 INFO FlatMappedDStream: Slide time = 500 ms
>> 20/09/21 01:15:37 INFO FlatMappedDStream: Storage level = Serialized 1x Replicated
>> 20/09/21 01:15:37 INFO FlatMappedDStream: Checkpoint interval = null
>> 20/09/21 01:15:37 INFO FlatMappedDStream: Remember interval = 500 ms
>> 20/09/21 01:15:37 INFO FlatMappedDStream: Initialized and validated org.apache.spark.streaming.dstream.FlatMappedDStream@4c293f9c
>> 20/09/21 01:15:37 INFO MappedDStream: Slide time = 500 ms
>> 20/09/21 01:15:37 INFO MappedDStream: Storage level = Serialized 1x Replicated
>> 20/09/21 01:15:37 INFO MappedDStream: Checkpoint interval = null
>> 20/09/21 01:15:37 INFO MappedDStream: Remember interval = 500 ms
>> 20/09/21 01:15:37 INFO MappedDStream: Initialized and validated org.apache.spark.streaming.dstream.MappedDStream@3430988a
>> 20/09/21 01:15:37 INFO TransformedDStream: Slide time = 500 ms
>> 20/09/21 01:15:37 INFO TransformedDStream: Storage level = Serialized 1x Replicated
>> 20/09/21 01:15:37 INFO TransformedDStream: Checkpoint interval = null
>> 20/09/21 01:15:37 INFO TransformedDStream: Remember interval = 500 ms
>> 20/09/21 01:15:37 INFO TransformedDStream: Initialized and validated org.apache.spark.streaming.dstream.TransformedDStream@17ab8c32
>> 20/09/21 01:15:37 INFO FilteredDStream: Slide time = 500 ms
>> 20/09/21 01:15:37 INFO FilteredDStream: Storage level = Serialized 1x Replicated
>> 20/09/21 01:15:37 INFO FilteredDStream: Checkpoint interval = null
>> 20/09/21 01:15:37 INFO FilteredDStream: Remember interval = 500 ms
>> 20/09/21 01:15:37 INFO FilteredDStream: Initialized and validated org.apache.spark.streaming.dstream.FilteredDStream@14660807
>> 20/09/21 01:15:37 INFO MappedDStream: Slide time = 500 ms
>> 20/09/21 01:15:37 INFO MappedDStream: Storage level = Serialized 1x Replicated
>> 20/09/21 01:15:37 INFO MappedDStream: Checkpoint interval = null
>> 20/09/21 01:15:37 INFO MappedDStream: Remember interval = 500 ms
>> 20/09/21 01:15:37 INFO MappedDStream: Initialized and validated org.apache.spark.streaming.dstream.MappedDStream@8f826ef
>> 20/09/21 01:15:37 INFO TransformedDStream: Slide time = 500 ms
>> 20/09/21 01:15:37 INFO TransformedDStream: Storage level = Serialized 1x Replicated
>> 20/09/21 01:15:37 INFO TransformedDStream: Checkpoint interval = null
>> 20/09/21 01:15:37 INFO TransformedDStream: Remember interval = 500 ms
>> 20/09/21 01:15:37 INFO TransformedDStream: Initialized and validated org.apache.spark.streaming.dstream.TransformedDStream@7e734051
>> 20/09/21 01:15:37 INFO FilteredDStream: Slide time = 500 ms
>> 20/09/21 01:15:37 INFO FilteredDStream: Storage level = Serialized 1x Replicated
>> 20/09/21 01:15:37 INFO FilteredDStream: Checkpoint interval = null
>> 20/09/21 01:15:37 INFO FilteredDStream: Remember interval = 500 ms
>> 20/09/21 01:15:37 INFO FilteredDStream: Initialized and validated org.apache.spark.streaming.dstream.FilteredDStream@1be77a7
>> 20/09/21 01:15:37 INFO MappedDStream: Slide time = 500 ms
>> 20/09/21 01:15:37 INFO MappedDStream: Storage level = Serialized 1x Replicated
>> 20/09/21 01:15:37 INFO MappedDStream: Checkpoint interval = null
>> 20/09/21 01:15:37 INFO MappedDStream: Remember interval = 500 ms
>> 20/09/21 01:15:37 INFO MappedDStream: Initialized and validated org.apache.spark.streaming.dstream.MappedDStream@25bae116
>> 20/09/21 01:15:37 INFO TransformedDStream: Slide time = 500 ms
>> 20/09/21 01:15:37 INFO TransformedDStream: Storage level = Serialized 1x Replicated
>> 20/09/21 01:15:37 INFO TransformedDStream: Checkpoint interval = null
>> 20/09/21 01:15:37 INFO TransformedDStream: Remember interval = 500 ms
>> 20/09/21 01:15:37 INFO TransformedDStream: Initialized and validated org.apache.spark.streaming.dstream.TransformedDStream@6e41c8a5
>> 20/09/21 01:15:37 INFO FilteredDStream: Slide time = 500 ms
>> 20/09/21 01:15:37 INFO FilteredDStream: Storage level = Serialized 1x Replicated
>> 20/09/21 01:15:37 INFO FilteredDStream: Checkpoint interval = null
>> 20/09/21 01:15:37 INFO FilteredDStream: Remember interval = 500 ms
>> 20/09/21 01:15:37 INFO FilteredDStream: Initialized and validated org.apache.spark.streaming.dstream.FilteredDStream@362e3875
>> 20/09/21 01:15:37 INFO MappedDStream: Slide time = 500 ms
>> 20/09/21 01:15:37 INFO MappedDStream: Storage level = Serialized 1x Replicated
>> 20/09/21 01:15:37 INFO MappedDStream: Checkpoint interval = null
>> 20/09/21 01:15:37 INFO MappedDStream: Remember interval = 500 ms
>> 20/09/21 01:15:37 INFO MappedDStream: Initialized and validated org.apache.spark.streaming.dstream.MappedDStream@22aaf27c
>> 20/09/21 01:15:37 INFO ForEachDStream: Slide time = 500 ms
>> 20/09/21 01:15:37 INFO ForEachDStream: Storage level = Serialized 1x Replicated
>> 20/09/21 01:15:37 INFO ForEachDStream: Checkpoint interval = null
>> 20/09/21 01:15:37 INFO ForEachDStream: Remember interval = 500 ms
>> 20/09/21 01:15:37 INFO ForEachDStream: Initialized and validated org.apache.spark.streaming.dstream.ForEachDStream@3bd7e62b
>> 20/09/21 01:15:37 INFO RecurringTimer: Started timer for JobGenerator at time 1600668938000
>> 20/09/21 01:15:37 INFO JobGenerator: Started JobGenerator at 1600668938000 ms
>> 20/09/21 01:15:37 INFO JobScheduler: Started JobScheduler
>> 20/09/21 01:15:37 INFO StreamingContext: StreamingContext started
>> 20/09/21 01:15:37 INFO StreamingContext: Invoking stop(stopGracefully=false) from shutdown hook
>> 20/09/21 01:15:37 INFO BatchedWriteAheadLog: BatchedWriteAheadLog shutting down at time: 1600668937717.
>> 20/09/21 01:15:37 WARN BatchedWriteAheadLog: BatchedWriteAheadLog Writer queue interrupted.
>> 20/09/21 01:15:37 INFO BatchedWriteAheadLog: BatchedWriteAheadLog Writer thread exiting.
>> 20/09/21 01:15:37 INFO FileBasedWriteAheadLog_ReceivedBlockTracker: Stopped write ahead log manager
>> 20/09/21 01:15:37 INFO ReceiverTracker: ReceiverTracker stopped
>> 20/09/21 01:15:37 INFO JobGenerator: Stopping JobGenerator immediately
>> 20/09/21 01:15:37 INFO RecurringTimer: Stopped timer for JobGenerator after time -1
>> 20/09/21 01:15:37 INFO CheckpointWriter: CheckpointWriter executor terminated? true, waited for 0 ms.
>> 20/09/21 01:15:37 INFO JobGenerator: Stopped JobGenerator
>> 20/09/21 01:15:37 INFO JobScheduler: Stopped JobScheduler
>> 20/09/21 01:15:37 INFO StreamingContext: StreamingContext stopped successfully
>> 20/09/21 01:15:37 INFO SparkContext: Invoking stop() from shutdown hook
>> 20/09/21 01:15:37 INFO SparkUI: Stopped Spark web UI at http://10.0.10.123:4040
>> 20/09/21 01:15:37 INFO StandaloneSchedulerBackend: Shutting down all executors
>> 20/09/21 01:15:37 INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Asking each executor to shut down
>> 20/09/21 01:15:37 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
>> 20/09/21 01:15:37 INFO MemoryStore: MemoryStore cleared
>> 20/09/21 01:15:37 INFO BlockManager: BlockManager stopped
>> 20/09/21 01:15:37 INFO BlockManagerMaster: BlockManagerMaster stopped
>> 20/09/21 01:15:37 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
>> 20/09/21 01:15:37 INFO SparkContext: Successfully stopped SparkContext
>> 20/09/21 01:15:37 INFO ShutdownHookManager: Shutdown hook called
>> 20/09/21 01:15:37 INFO ShutdownHookManager: Deleting directory /private/var/folders/kl/98ycp0vj19z082xtn76h07r00000gn/T/spark-fac2b91e-8bfc-4855-b0f5-bbb206bb678b
>> 20/09/21 01:15:37 INFO ShutdownHookManager: Deleting directory /private/var/folders/kl/98ycp0vj19z082xtn76h07r00000gn/T/spark-e3447ff1-ca22-4574-9864-834fac63eea3
>>
>>