You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Aravind <ar...@gmail.com> on 2014/07/09 02:33:06 UTC

Spark Streaming using File Stream in Java

Hi all,

I am trying to run the NetworkWordCount.java file in the streaming examples.
The example shows how to read from a network socket. But my usecase is that
, I have a local log file which is a stream and continuously updated (say
/Users/.../Desktop/mylog.log).

I would like to write the same NetworkWordCount.java using this filestream

jssc.fileStream(dataDirectory);

Question:
1. How do I write a mapreduce function for the above to measure wordcounts
(in java, not scala)?

2. Also does the streaming application stop if the file is not updating or
does it continuously poll for the file updates? 

I am a new user of Apache Spark Streaming. Kindly help me as I am totally
stuck....

Thanks in advance.

Regards
Aravind



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-using-File-Stream-in-Java-tp9115.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Spark Streaming using File Stream in Java

Posted by Tathagata Das <ta...@gmail.com>.
The fileStream is not designed to work with continuously updating file, as
the one of the main design goals of Spark is immutability (to guarantee
fault-tolerance by recomputation), and files that are appending (mutating)
defeats that. It rather designed to pickup new files added atomically
(using move) to a directory. So to make it work with your continuously
updated file, you will probably have to write something that periodically
rotates the continuously updated log file into separate files, and then
those files gets copied into a directory.

TD


On Wed, Jul 9, 2014 at 9:34 AM, Aravind <ar...@gmail.com> wrote:

> Hi Akil,
>
> It didnt work. Here is the code...
>
>
> package com.paypal;
>
> import org.apache.spark.SparkConf;
> import org.apache.spark.storage.StorageLevel;
> import org.apache.spark.streaming.api.java.JavaPairInputDStream;
> import org.apache.spark.streaming.api.java.JavaStreamingContext;
> import org.apache.spark.api.java.*;
> import org.apache.spark.api.java.function.*;
> import org.apache.spark.streaming.*;
> import org.apache.spark.streaming.api.java.*;
>
> import com.google.common.collect.Lists;
>
> import org.apache.spark.streaming.receiver.Receiver;
> import scala.Tuple2;
>
> import java.net.ConnectException;
> import java.net.Socket;
> import java.util.Arrays;
> import java.util.regex.Pattern;
> import java.io.*;
> /**
>  * Hello world!
>  *
>  */
> public class App3
> {
>     private static final Pattern SPACE = Pattern.compile(" ");
>
>     public static void main(String[] args) {
>
>         // Create the context with a 1 second batch size
>         SparkConf sparkConf = new
> SparkConf().setAppName("JavaNetworkWordCount");
>
>         // ******* always give local[4] to execute and see the output
>         JavaStreamingContext ssc = new JavaStreamingContext("local[4]",
> "JavaNetworkWordCount",  new Duration(5000));
>
> // throws an error saying requires JavaPairDstream and not JavaDstream.
>         JavaDStream<String> lines =
> ssc.fileStream("/Users/../Desktop/alarms.log");
>         JavaDStream<String> words = lines.flatMap(
>                 new FlatMapFunction<String, String>() {
>                     public Iterable<String> call(String s) {
>                         return Arrays.asList(s.split(" "));
>                     }
>                 }
>         );
>
>         JavaPairDStream<String, Integer> ones = words.map(
>                 new Function<String, Integer>() {
>                     public Tuple2<String, Integer> call(String s) {
>                         return new Tuple2(s, 1);
>                     }
>                 }
>         );
>
>         JavaPairDStream<String, Integer> counts = ones.reduceByKey(
>                 new Function2<Integer, Integer, Integer>() {
>                     public Integer call(Integer i1, Integer i2) {
>                         return i1 + i2;
>                     }
>                 }
>         );
>
>
>         System.out.println("Hello world");
>         wordCounts.print();
>
>         ssc.start();
>         ssc.awaitTermination();
>     }
>
>
> }
>
> I am not able to figure out how to type cast the objects of Type
> JavaPairDStream to JDstream. Can you provide me a working code for the
> same.
> Thanks in advance.
>
> Regards
> Aravindan
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-using-File-Stream-in-Java-tp9115p9204.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>

Re: Spark Streaming using File Stream in Java

Posted by Aravind <ar...@gmail.com>.
Hi Akil,

It didnt work. Here is the code...


package com.paypal;

import org.apache.spark.SparkConf;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;

import com.google.common.collect.Lists;

import org.apache.spark.streaming.receiver.Receiver;
import scala.Tuple2;

import java.net.ConnectException;
import java.net.Socket;
import java.util.Arrays;
import java.util.regex.Pattern;
import java.io.*;
/**
 * Hello world!
 *
 */
public class App3
{
    private static final Pattern SPACE = Pattern.compile(" ");

    public static void main(String[] args) {

        // Create the context with a 1 second batch size
        SparkConf sparkConf = new
SparkConf().setAppName("JavaNetworkWordCount");

        // ******* always give local[4] to execute and see the output
        JavaStreamingContext ssc = new JavaStreamingContext("local[4]",
"JavaNetworkWordCount",  new Duration(5000));

// throws an error saying requires JavaPairDstream and not JavaDstream.
        JavaDStream<String> lines =
ssc.fileStream("/Users/../Desktop/alarms.log");
        JavaDStream<String> words = lines.flatMap(
                new FlatMapFunction<String, String>() {
                    public Iterable<String> call(String s) {
                        return Arrays.asList(s.split(" "));
                    }
                }
        );

        JavaPairDStream<String, Integer> ones = words.map(
                new Function<String, Integer>() {
                    public Tuple2<String, Integer> call(String s) {
                        return new Tuple2(s, 1);
                    }
                }
        );

        JavaPairDStream<String, Integer> counts = ones.reduceByKey(
                new Function2<Integer, Integer, Integer>() {
                    public Integer call(Integer i1, Integer i2) {
                        return i1 + i2;
                    }
                }
        );


        System.out.println("Hello world");
        wordCounts.print();

        ssc.start();
        ssc.awaitTermination();
    }


}

I am not able to figure out how to type cast the objects of Type
JavaPairDStream to JDstream. Can you provide me a working code for the same.
Thanks in advance. 

Regards
Aravindan





--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-using-File-Stream-in-Java-tp9115p9204.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Spark Streaming using File Stream in Java

Posted by Akhil Das <ak...@sigmoidanalytics.com>.
Try this out:

JavaStreamingContext sc = new
JavaStreamingContext(...);JavaDStream<String> lines =
ctx.fileStream("whatever");JavaDStream<String> words = lines.flatMap(
  new FlatMapFunction<String, String>() {
    public Iterable<String> call(String s) {
      return Arrays.asList(s.split(" "));
    }
  });

JavaPairDStream<String, Integer> ones = words.map(
  new PairFunction<String, String, Integer>() {
    public Tuple2<String, Integer> call(String s) {
      return new Tuple2(s, 1);
    }
  });

JavaPairDStream<String, Integer> counts = ones.reduceByKey(
  new Function2<Integer, Integer, Integer>() {
    public Integer call(Integer i1, Integer i2) {
      return i1 + i2;
    }
  });


​Actually modified from
https://spark.apache.org/docs/0.9.1/java-programming-guide.html#example​

Thanks
Best Regards


On Wed, Jul 9, 2014 at 6:03 AM, Aravind <ar...@gmail.com> wrote:

> Hi all,
>
> I am trying to run the NetworkWordCount.java file in the streaming
> examples.
> The example shows how to read from a network socket. But my usecase is that
> , I have a local log file which is a stream and continuously updated (say
> /Users/.../Desktop/mylog.log).
>
> I would like to write the same NetworkWordCount.java using this filestream
>
> jssc.fileStream(dataDirectory);
>
> Question:
> 1. How do I write a mapreduce function for the above to measure wordcounts
> (in java, not scala)?
>
> 2. Also does the streaming application stop if the file is not updating or
> does it continuously poll for the file updates?
>
> I am a new user of Apache Spark Streaming. Kindly help me as I am totally
> stuck....
>
> Thanks in advance.
>
> Regards
> Aravind
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-using-File-Stream-in-Java-tp9115.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>