You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (JIRA)" <ji...@apache.org> on 2019/05/21 04:13:33 UTC
[jira] [Resolved] (SPARK-23766) Not able to execute multiple
queries in spark structured streaming
[ https://issues.apache.org/jira/browse/SPARK-23766?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Hyukjin Kwon resolved SPARK-23766.
----------------------------------
Resolution: Incomplete
> Not able to execute multiple queries in spark structured streaming
> ------------------------------------------------------------------
>
> Key: SPARK-23766
> URL: https://issues.apache.org/jira/browse/SPARK-23766
> Project: Spark
> Issue Type: Bug
> Components: Structured Streaming
> Affects Versions: 2.2.0
> Reporter: Apeksha Agnihotri
> Priority: Major
> Labels: bulk-closed
>
> I am able to receive output of first query(.ie reader) only. Although all the queries are running in logs.No data is stored in hdfs also
>
> {code:java}
> public class A extends D implements Serializable {
> public Dataset<Row> getDataSet(SparkSession session) {
> Dataset<Row> dfs = session.readStream().format("socket").option("host", hostname).option("port", port).load();
> publish(dfs.toDF(), "reader");
> return dfs;
> }
> }
> public class B extends D implements Serializable {
> public Dataset<Row> execute(Dataset<Row> ds) {
> Dataset<Row> d = ds.select(functions.explode(functions.split(ds.col("value"), "\\s+")));
> publish(d.toDF(), "component");
> return d;
> }
> }
> public class C extends D implements Serializable {
> public Dataset<Row> execute(Dataset<Row> ds) {
> publish(inputDataSet.toDF(), "console");
> ds.writeStream().format("csv").option("path", "hdfs://hostname:9000/user/abc/data1/")
> .option("checkpointLocation", "hdfs://hostname:9000/user/abc/cp").outputMode("append").start();
> return ds;
> }
> }
> public class D {
> public void publish(Dataset<Row> dataset, String name) {
> dataset.writeStream().format("csv").queryName(name).option("path", "hdfs://hostname:9000/user/abc/" + name)
> .option("checkpointLocation", "hdfs://hostname:9000/user/abc/checkpoint/" + directory).outputMode("append")
> .start();
> }
> }
> public static void main(String[] args) {
> SparkSession session = createSession();
> try {
> A a = new A();
> Dataset<Row> records = a.getDataSet(session);
> B b = new B();
> Dataset<Row> ds = b.execute(records);
> C c = new C();
> c.execute(ds);
> session.streams().awaitAnyTermination();
> } catch (StreamingQueryException e) {
> e.printStackTrace();
> }
> }
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org