You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Michael Quinlan <mq...@gmail.com> on 2014/09/23 06:17:33 UTC
Java Implementation of StreamingContext.fileStream
I'm attempting to code a Java only implementation accessing the
StreamingContext.fileStream method and am especially interested in setting
the boolean "newFilesOnly" to false. Unfortunately my code throws
exceptions:
Exception in thread "main" java.lang.InstantiationException
at
sun.reflect.InstantiationExceptionConstructorAccessorImpl.newInstance(InstantiationExceptionConstructorAccessorImpl.java:48)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at java.lang.Class.newInstance(Class.java:374)
at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:83)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
whenever the files are opened. The exceptions are generated whether or not I
invoke the longer form of the fileStream method. I can use the
JavaStreamingContext version successfully, but don't have access to the
boolean flag in this case. If someone sees an issue with the code below, I
would be very grateful for a nudge in the right direction.
SparkConf conf = new SparkConf();
conf.setMaster("local[2]");
conf.setAppName("SparkStreamingFileTest");
conf.set("spark.cores.max", "1");
conf.set("spark.executor.memory","1g");
List<String> inputjarslist = new ArrayList<String>();
inputjarslist.add("/home/usr/target/lib/scala-library-2.10.1.jar");
inputjarslist.add("/home/usr/target/lib/spark-assembly-1.0.2-hadoop2.2.0.jar");
inputjarslist.add("/home/usr/target/lib/spark-streaming_2.10-1.0.2.jar");
//Seq<String> inputjars = asScalaBuffer(inputjarslist);
conf.setJars(inputjarslist.toArray(new String[3]));
StreamingContext scc = new StreamingContext(conf, new
Duration(10000));
Seq<String> thejars = scc.sc().jars();
scala.collection.Iterator iter = thejars.iterator();
if(!(iter.hasNext())) System.out.println("no jars
associated!!");
while (iter.hasNext()) {
System.out.println("Jar in system: "+iter.next());
}
Function1<Path,Object> f = new
AbstractFunction1<Path,Object>() {
public Boolean apply(Path input){
return true;
}
};
//scala.reflect.ClassTag$.MODULE$.apply(LongWritable.class);
ClassTag <LongWritable> k =
scala.reflect.ClassTag$.MODULE$.apply(LongWritable.class);
ClassTag <Text> v
=scala.reflect.ClassTag$.MODULE$.apply(Text.class);
ClassTag <InputFormat<LongWritable,Text>> t =
scala.reflect.ClassTag$.MODULE$.apply(InputFormat.class);
InputDStream<Tuple2<LongWritable,Text>> ans =
scc.fileStream("/home/usr/testDataDirectory", f, false, k, v, t);
//InputDStream<Tuple2<LongWritable,Text>> ans =
scc.fileStream("/home/usr/testDataDirectory",k,v,t);
ans.print();
--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Java-Implementation-of-StreamingContext-fileStream-tp14863.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.
---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org
Re: Java Implementation of StreamingContext.fileStream
Posted by Michael Quinlan <mq...@gmail.com>.
Thanks very much for the pointer, which validated my initial approach. It
turns out that I was creating a tag for the abstract class
"InputFormat.class". Using "TextInputFormat.class" instead fixed my issue.
Regards,
Mike
--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Java-Implementation-of-StreamingContext-fileStream-tp14863p14923.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.
---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org
Re: Java Implementation of StreamingContext.fileStream
Posted by Akhil Das <ak...@sigmoidanalytics.com>.
Here's a working version that we have.
> DStream<Tuple2<Text, Tuple>> hadoopDStream =
> streamingContext.fileStream("/akhld/lookhere/", new Function<Path,
> Object>(){
> @Override
> public Object call(Path path) throws Exception {
> // TODO Auto-generated method stub
> return !path.getName().startsWith(".");
> } }, true, SparkUtil.getManifest(Text.class),
> SparkUtil.getManifest(Tuple.class),
> SparkUtil.getManifest(PigInputFormat.class));
Thanks
Best Regards
On Tue, Sep 23, 2014 at 9:47 AM, Michael Quinlan <mq...@gmail.com> wrote:
> I'm attempting to code a Java only implementation accessing the
> StreamingContext.fileStream method and am especially interested in setting
> the boolean "newFilesOnly" to false. Unfortunately my code throws
> exceptions:
>
> Exception in thread "main" java.lang.InstantiationException
> at
>
> sun.reflect.InstantiationExceptionConstructorAccessorImpl.newInstance(InstantiationExceptionConstructorAccessorImpl.java:48)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
> at java.lang.Class.newInstance(Class.java:374)
> at
> org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:83)
> at
> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
> at
> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
>
> whenever the files are opened. The exceptions are generated whether or not
> I
> invoke the longer form of the fileStream method. I can use the
> JavaStreamingContext version successfully, but don't have access to the
> boolean flag in this case. If someone sees an issue with the code below, I
> would be very grateful for a nudge in the right direction.
>
> SparkConf conf = new SparkConf();
> conf.setMaster("local[2]");
> conf.setAppName("SparkStreamingFileTest");
> conf.set("spark.cores.max", "1");
> conf.set("spark.executor.memory","1g");
>
> List<String> inputjarslist = new ArrayList<String>();
>
> inputjarslist.add("/home/usr/target/lib/scala-library-2.10.1.jar");
>
>
> inputjarslist.add("/home/usr/target/lib/spark-assembly-1.0.2-hadoop2.2.0.jar");
>
> inputjarslist.add("/home/usr/target/lib/spark-streaming_2.10-1.0.2.jar");
>
> //Seq<String> inputjars = asScalaBuffer(inputjarslist);
> conf.setJars(inputjarslist.toArray(new String[3]));
>
> StreamingContext scc = new StreamingContext(conf, new
> Duration(10000));
>
> Seq<String> thejars = scc.sc().jars();
> scala.collection.Iterator iter = thejars.iterator();
> if(!(iter.hasNext())) System.out.println("no jars
> associated!!");
> while (iter.hasNext()) {
> System.out.println("Jar in system: "+iter.next());
> }
>
> Function1<Path,Object> f = new
> AbstractFunction1<Path,Object>() {
> public Boolean apply(Path input){
> return true;
> }
> };
>
>
> //scala.reflect.ClassTag$.MODULE$.apply(LongWritable.class);
>
> ClassTag <LongWritable> k =
> scala.reflect.ClassTag$.MODULE$.apply(LongWritable.class);
> ClassTag <Text> v
> =scala.reflect.ClassTag$.MODULE$.apply(Text.class);
> ClassTag <InputFormat<LongWritable,Text>> t =
> scala.reflect.ClassTag$.MODULE$.apply(InputFormat.class);
>
> InputDStream<Tuple2<LongWritable,Text>> ans =
> scc.fileStream("/home/usr/testDataDirectory", f, false, k, v, t);
> //InputDStream<Tuple2<LongWritable,Text>> ans =
> scc.fileStream("/home/usr/testDataDirectory",k,v,t);
>
> ans.print();
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Java-Implementation-of-StreamingContext-fileStream-tp14863.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>