You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (JIRA)" <ji...@apache.org> on 2019/05/21 04:13:33 UTC

[jira] [Resolved] (SPARK-23766) Not able to execute multiple queries in spark structured streaming

     [ https://issues.apache.org/jira/browse/SPARK-23766?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Hyukjin Kwon resolved SPARK-23766.
----------------------------------
    Resolution: Incomplete

> Not able to execute multiple queries in spark structured streaming
> ------------------------------------------------------------------
>
>                 Key: SPARK-23766
>                 URL: https://issues.apache.org/jira/browse/SPARK-23766
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.2.0
>            Reporter: Apeksha Agnihotri
>            Priority: Major
>              Labels: bulk-closed
>
> I am able to receive output of first query(.ie reader) only. Although all the queries are running in logs.No data is stored in hdfs also
>  
> {code:java}
> public class A extends D implements Serializable {
>     public Dataset<Row> getDataSet(SparkSession session) {
>         Dataset<Row> dfs = session.readStream().format("socket").option("host", hostname).option("port", port).load();
>         publish(dfs.toDF(), "reader");
>         return dfs;
>     }
> }
> public class B extends D implements Serializable {
>     public Dataset<Row> execute(Dataset<Row> ds) {
>        Dataset<Row> d = ds.select(functions.explode(functions.split(ds.col("value"), "\\s+")));
>         publish(d.toDF(), "component");
>         return d;
>     }
> }
> public class C extends D implements Serializable {
>     public Dataset<Row> execute(Dataset<Row> ds) {
>         publish(inputDataSet.toDF(), "console");
>         ds.writeStream().format("csv").option("path", "hdfs://hostname:9000/user/abc/data1/")
>                 .option("checkpointLocation", "hdfs://hostname:9000/user/abc/cp").outputMode("append").start();
>         return ds;
>     }
> }
> public class D {
>     public void publish(Dataset<Row> dataset, String name) {
>         dataset.writeStream().format("csv").queryName(name).option("path", "hdfs://hostname:9000/user/abc/" + name)
>                 .option("checkpointLocation", "hdfs://hostname:9000/user/abc/checkpoint/" + directory).outputMode("append")
>                 .start();
>     }
> }
> public static void main(String[] args) {
>     SparkSession session = createSession();
>     try {
>         A a = new A();
>         Dataset<Row> records = a.getDataSet(session);
>         B b = new B();
>         Dataset<Row> ds = b.execute(records);
>         C c = new C();
>         c.execute(ds);
>         session.streams().awaitAnyTermination();
>     } catch (StreamingQueryException e) {
>         e.printStackTrace();
>     }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org