You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Michael Quinlan <mq...@gmail.com> on 2014/09/23 06:17:33 UTC

Java Implementation of StreamingContext.fileStream

I'm attempting to code a Java only implementation accessing the
StreamingContext.fileStream method and am especially interested in setting
the boolean "newFilesOnly" to false. Unfortunately my code throws
exceptions:

Exception in thread "main" java.lang.InstantiationException
	at
sun.reflect.InstantiationExceptionConstructorAccessorImpl.newInstance(InstantiationExceptionConstructorAccessorImpl.java:48)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
	at java.lang.Class.newInstance(Class.java:374)
	at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:83)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)

whenever the files are opened. The exceptions are generated whether or not I
invoke the longer form of the fileStream method. I can use the
JavaStreamingContext version successfully, but don't have access to the
boolean flag in this case. If someone sees an issue with the code below, I
would be very grateful for a nudge in the right direction.

                SparkConf conf = new SparkConf();
                conf.setMaster("local[2]");
                conf.setAppName("SparkStreamingFileTest");
                conf.set("spark.cores.max", "1");
                conf.set("spark.executor.memory","1g");

                List<String> inputjarslist = new ArrayList<String>();
               
inputjarslist.add("/home/usr/target/lib/scala-library-2.10.1.jar");
               
inputjarslist.add("/home/usr/target/lib/spark-assembly-1.0.2-hadoop2.2.0.jar");
               
inputjarslist.add("/home/usr/target/lib/spark-streaming_2.10-1.0.2.jar");

                //Seq<String> inputjars = asScalaBuffer(inputjarslist);
                conf.setJars(inputjarslist.toArray(new String[3]));

                StreamingContext scc = new StreamingContext(conf, new
Duration(10000));

                Seq<String> thejars = scc.sc().jars();
                scala.collection.Iterator iter = thejars.iterator();
                if(!(iter.hasNext())) System.out.println("no jars
associated!!");
                while (iter.hasNext()) {
                        System.out.println("Jar in system: "+iter.next());
                }

                Function1<Path,Object> f = new
AbstractFunction1<Path,Object>() {
                        public Boolean apply(Path input){
                          return true;
                        }
                };

                //scala.reflect.ClassTag$.MODULE$.apply(LongWritable.class);

                ClassTag <LongWritable> k =
scala.reflect.ClassTag$.MODULE$.apply(LongWritable.class);
                ClassTag <Text> v
=scala.reflect.ClassTag$.MODULE$.apply(Text.class);
                ClassTag <InputFormat&lt;LongWritable,Text>> t =
scala.reflect.ClassTag$.MODULE$.apply(InputFormat.class);

                InputDStream<Tuple2&lt;LongWritable,Text>> ans =
scc.fileStream("/home/usr/testDataDirectory", f, false, k, v, t);
                //InputDStream<Tuple2&lt;LongWritable,Text>> ans =
scc.fileStream("/home/usr/testDataDirectory",k,v,t);

                ans.print();





--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Java-Implementation-of-StreamingContext-fileStream-tp14863.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Java Implementation of StreamingContext.fileStream

Posted by Michael Quinlan <mq...@gmail.com>.
Thanks very much for the pointer, which validated my initial approach. It
turns out that I was creating a tag for the abstract class
"InputFormat.class". Using "TextInputFormat.class" instead fixed my issue.

Regards,

Mike



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Java-Implementation-of-StreamingContext-fileStream-tp14863p14923.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Java Implementation of StreamingContext.fileStream

Posted by Akhil Das <ak...@sigmoidanalytics.com>.
Here's a working version that we have.


> DStream<Tuple2<Text, Tuple>> hadoopDStream =
> streamingContext.fileStream("/akhld/lookhere/", new Function<Path,
> Object>(){
> @Override
> public Object call(Path path) throws Exception {
> // TODO Auto-generated method stub
> return !path.getName().startsWith(".");
> } }, true, SparkUtil.getManifest(Text.class),
> SparkUtil.getManifest(Tuple.class),
> SparkUtil.getManifest(PigInputFormat.class));


Thanks
Best Regards

On Tue, Sep 23, 2014 at 9:47 AM, Michael Quinlan <mq...@gmail.com> wrote:

> I'm attempting to code a Java only implementation accessing the
> StreamingContext.fileStream method and am especially interested in setting
> the boolean "newFilesOnly" to false. Unfortunately my code throws
> exceptions:
>
> Exception in thread "main" java.lang.InstantiationException
>         at
>
> sun.reflect.InstantiationExceptionConstructorAccessorImpl.newInstance(InstantiationExceptionConstructorAccessorImpl.java:48)
>         at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
>         at java.lang.Class.newInstance(Class.java:374)
>         at
> org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:83)
>         at
> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
>         at
> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
>
> whenever the files are opened. The exceptions are generated whether or not
> I
> invoke the longer form of the fileStream method. I can use the
> JavaStreamingContext version successfully, but don't have access to the
> boolean flag in this case. If someone sees an issue with the code below, I
> would be very grateful for a nudge in the right direction.
>
>                 SparkConf conf = new SparkConf();
>                 conf.setMaster("local[2]");
>                 conf.setAppName("SparkStreamingFileTest");
>                 conf.set("spark.cores.max", "1");
>                 conf.set("spark.executor.memory","1g");
>
>                 List<String> inputjarslist = new ArrayList<String>();
>
> inputjarslist.add("/home/usr/target/lib/scala-library-2.10.1.jar");
>
>
> inputjarslist.add("/home/usr/target/lib/spark-assembly-1.0.2-hadoop2.2.0.jar");
>
> inputjarslist.add("/home/usr/target/lib/spark-streaming_2.10-1.0.2.jar");
>
>                 //Seq<String> inputjars = asScalaBuffer(inputjarslist);
>                 conf.setJars(inputjarslist.toArray(new String[3]));
>
>                 StreamingContext scc = new StreamingContext(conf, new
> Duration(10000));
>
>                 Seq<String> thejars = scc.sc().jars();
>                 scala.collection.Iterator iter = thejars.iterator();
>                 if(!(iter.hasNext())) System.out.println("no jars
> associated!!");
>                 while (iter.hasNext()) {
>                         System.out.println("Jar in system: "+iter.next());
>                 }
>
>                 Function1<Path,Object> f = new
> AbstractFunction1<Path,Object>() {
>                         public Boolean apply(Path input){
>                           return true;
>                         }
>                 };
>
>
> //scala.reflect.ClassTag$.MODULE$.apply(LongWritable.class);
>
>                 ClassTag <LongWritable> k =
> scala.reflect.ClassTag$.MODULE$.apply(LongWritable.class);
>                 ClassTag <Text> v
> =scala.reflect.ClassTag$.MODULE$.apply(Text.class);
>                 ClassTag <InputFormat&lt;LongWritable,Text>> t =
> scala.reflect.ClassTag$.MODULE$.apply(InputFormat.class);
>
>                 InputDStream<Tuple2&lt;LongWritable,Text>> ans =
> scc.fileStream("/home/usr/testDataDirectory", f, false, k, v, t);
>                 //InputDStream<Tuple2&lt;LongWritable,Text>> ans =
> scc.fileStream("/home/usr/testDataDirectory",k,v,t);
>
>                 ans.print();
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Java-Implementation-of-StreamingContext-fileStream-tp14863.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>