You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Yuta Morisawa <yu...@kddi-research.jp> on 2018/05/09 07:14:36 UTC

Spark 2.3.0 Structured Streaming Kafka Timestamp

Hi All

I'm trying to extract Kafka-timestamp from Kafka topics.

The timestamp does not contain milli-seconds information,
but it should contain because ConsumerRecord class of Kafka 0.10 
supports milli-second timestamp.

How can I get milli-second timestamp from Kafka topics?


These are websites I refer to.
 
https://spark.apache.org/docs/2.3.0/structured-streaming-kafka-integration.html
 
https://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/streams/processor/TimestampExtractor.html


And this is my code.
----
val df = spark
   .readStream
   .format("kafka")
   .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
   .option("subscribe", "topic1,topic2")
   .load()
   .selectExpr("CAST(timestamp AS LONG)", "CAST(value AS STRING)")
   .as[(Long, String)]
----

Regards,
Yuta


---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Re: Spark 2.3.0 Structured Streaming Kafka Timestamp

Posted by Michael Armbrust <mi...@databricks.com>.
Hmm yeah that does look wrong.  Would be great if someone opened a PR to
correct the docs :)

On Thu, May 10, 2018 at 5:13 PM Yuta Morisawa <yu...@kddi-research.jp>
wrote:

> The problem is solved.
> The actual schema of Kafka message is different from documentation.
>
>
> https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html
>
> The documentation says the format of "timestamp" column is Long type,
> but the actual format is timestamp.
>
>
> The followings are my code and result to check schema.
>
> -code
> val df = spark
>        .read
>        .format("kafka")
>        .option("kafka.bootstrap.servers", bootstrapServers)
>        .option(subscribeType, topics)
>        .load()
>        .printSchema()
>
> -result
> root
>   |-- key: binary (nullable = true)
>   |-- value: binary (nullable = true)
>   |-- topic: string (nullable = true)
>   |-- partition: integer (nullable = true)
>   |-- offset: long (nullable = true)
>   |-- timestamp: timestamp (nullable = true)
>   |-- timestampType: integer (nullable = true)
>
>
> Regards,
> Yuta
>
> On 2018/05/09 16:14, Yuta Morisawa wrote:
> > Hi All
> >
> > I'm trying to extract Kafka-timestamp from Kafka topics.
> >
> > The timestamp does not contain milli-seconds information,
> > but it should contain because ConsumerRecord class of Kafka 0.10
> > supports milli-second timestamp.
> >
> > How can I get milli-second timestamp from Kafka topics?
> >
> >
> > These are websites I refer to.
> >
> >
> https://spark.apache.org/docs/2.3.0/structured-streaming-kafka-integration.html
> >
> >
> >
> https://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/streams/processor/TimestampExtractor.html
> >
> >
> >
> > And this is my code.
> > ----
> > val df = spark
> >    .readStream
> >    .format("kafka")
> >    .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
> >    .option("subscribe", "topic1,topic2")
> >    .load()
> >    .selectExpr("CAST(timestamp AS LONG)", "CAST(value AS STRING)")
> >    .as[(Long, String)]
> > ----
> >
> > Regards,
> > Yuta
> >
> >
> > ---------------------------------------------------------------------
> > To unsubscribe e-mail: user-unsubscribe@spark.apache.org
> >
> >
>
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>
>

Re: Spark 2.3.0 Structured Streaming Kafka Timestamp

Posted by Yuta Morisawa <yu...@kddi-research.jp>.
The problem is solved.
The actual schema of Kafka message is different from documentation.

https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html

The documentation says the format of "timestamp" column is Long type, 
but the actual format is timestamp.


The followings are my code and result to check schema.

-code
val df = spark
       .read
       .format("kafka")
       .option("kafka.bootstrap.servers", bootstrapServers)
       .option(subscribeType, topics)
       .load()
       .printSchema()

-result
root
  |-- key: binary (nullable = true)
  |-- value: binary (nullable = true)
  |-- topic: string (nullable = true)
  |-- partition: integer (nullable = true)
  |-- offset: long (nullable = true)
  |-- timestamp: timestamp (nullable = true)
  |-- timestampType: integer (nullable = true)


Regards,
Yuta

On 2018/05/09 16:14, Yuta Morisawa wrote:
> Hi All
> 
> I'm trying to extract Kafka-timestamp from Kafka topics.
> 
> The timestamp does not contain milli-seconds information,
> but it should contain because ConsumerRecord class of Kafka 0.10 
> supports milli-second timestamp.
> 
> How can I get milli-second timestamp from Kafka topics?
> 
> 
> These are websites I refer to.
> 
> https://spark.apache.org/docs/2.3.0/structured-streaming-kafka-integration.html 
> 
> 
> https://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/streams/processor/TimestampExtractor.html 
> 
> 
> 
> And this is my code.
> ----
> val df = spark
>    .readStream
>    .format("kafka")
>    .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
>    .option("subscribe", "topic1,topic2")
>    .load()
>    .selectExpr("CAST(timestamp AS LONG)", "CAST(value AS STRING)")
>    .as[(Long, String)]
> ----
> 
> Regards,
> Yuta
> 
> 
> ---------------------------------------------------------------------
> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
> 
> 


---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Re: Spark 2.3.0 --files vs. addFile()

Posted by "Lalwani, Jayesh" <Ja...@capitalone.com>.
This is a long standing bug in Spark. –jars and –files doesn’t work in Standalone mode
https://issues.apache.org/jira/browse/SPARK-4160

From: Marius <m....@gmail.com>
Date: Wednesday, May 9, 2018 at 3:51 AM
To: "user@spark.apache.org" <us...@spark.apache.org>
Subject: Spark 2.3.0 --files vs. addFile()

Hey,

i am using Spark to distribute the execution of a binary tool and to do some further calculation further down stream. I want to distribute the binary tool using either the --files or the addFile option from spark to make it available on each worker node. However although he tells my that he added the file:
2018-05-09 07:42:19 INFO  SparkContext:54 - Added file s3a://executables/blastp at s3a://executables/foo with timestamp 1525851739972
2018-05-09 07:42:20 INFO  Utils:54 - Fetching s3a://executables/foo to /tmp/spark-54931ea6-b3d6-419b-997b-a498da898b77/userFiles-5e4b66e5-de4a-4420-a641-4453b9ea2ead/fetchFileTemp3437582648265876247.tmp

However when i want to execute the tool using pipe it does not work. I currently assume that the file is only downloaded to the master node. However i am not sure if i misunderstood the concept of adding files in spark or if i did something wrong.
I am getting the path with Sparkfiles.get(). It does work but the bin is not there.

This is my call:

spark-submit \

--class de.jlu.bioinfsys.sparkBlast.run.Run \

--master $master \

--jars ${awsPath},${awsJavaSDK} \

--files s3a://database/a.a.z,s3a://database/a.a.y,s3a://database/a.a.x,s3a://executables/tool \

--conf spark.executor.extraClassPath=${awsPath}:${awsJavaSDK} \

--conf spark.driver.extraClassPath=${awsPath}:${awsJavaSDK} \

--conf spark.hadoop.fs.s3a.endpoint=https://s3.computational.bio.uni-giessen.de/<https://urldefense.proofpoint.com/v2/url?u=https-3A__s3.computational.bio.uni-2Dgiessen.de_&d=DwMDaQ&c=pLULRYW__RtkwsQUPxJVDGboCTdgji3AcHNJU0BpTJE&r=F2RNeGILvLdBxn7RJ4effes_QFIiEsoVM2rPi9qX1DKow5HQSjq0_WhIW109SXQ4&m=X4cHBr7qC1lO1X6gj37AszPDf02jI96W88e-9rVvaFw&s=eN8XVAxUFlLI0UTs2kV6eTGXhM6c8OJMJMWaa5rlK2I&e=> \

--conf spark.hadoop.fs.s3a.access.key=$s3Access \

--conf spark.hadoop.fs.s3a.secret.key=$s3Secret \

--conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \

${execJarPath}
I am using Spark v 2.3.0 along with scala in Standalone cluster node with three workers.

Cheers
Marius




________________________________________________________

The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.

Spark 2.3.0 --files vs. addFile()

Posted by Marius <m....@gmail.com>.
Hey,

i am using Spark to distribute the execution of a binary tool and to do 
some further calculation further down stream. I want to distribute the 
binary tool using either the --files or the addFile option from spark to 
make it available on each worker node. However although he tells my that 
he added the file:
2018-05-09 07:42:19 INFO  SparkContext:54 - Added file 
s3a://executables/blastp at s3a://executables/foo with timestamp 
1525851739972
2018-05-09 07:42:20 INFO  Utils:54 - Fetching s3a://executables/foo to 
/tmp/spark-54931ea6-b3d6-419b-997b-a498da898b77/userFiles-5e4b66e5-de4a-4420-a641-4453b9ea2ead/fetchFileTemp3437582648265876247.tmp

However when i want to execute the tool using pipe it does not work. I 
currently assume that the file is only downloaded to the master node. 
However i am not sure if i misunderstood the concept of adding files in 
spark or if i did something wrong.
I am getting the path with Sparkfiles.get(). It does work but the bin is 
not there.

This is my call:

spark-submit \
--class de.jlu.bioinfsys.sparkBlast.run.Run \
--master $master \
--jars${awsPath},${awsJavaSDK} \
--files s3a://database/a.a.z,s3a://database/a.a.y,s3a://database/a.a.x,s3a://executables/tool \
--conf spark.executor.extraClassPath=${awsPath}:${awsJavaSDK} \
--conf spark.driver.extraClassPath=${awsPath}:${awsJavaSDK} \
--conf spark.hadoop.fs.s3a.endpoint=https://s3.computational.bio.uni-giessen.de/ \
--conf spark.hadoop.fs.s3a.access.key=$s3Access \
--conf spark.hadoop.fs.s3a.secret.key=$s3Secret \
--conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
${execJarPath}

I am using Spark v 2.3.0 along with scala in Standalone cluster node 
with three workers.

Cheers
Marius