You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2019/03/26 14:25:14 UTC

[GitHub] [incubator-hudi] leilinen opened a new issue #615: how change HoodieDeltaStreamer with Real-time calculation

leilinen opened a new issue #615: how change HoodieDeltaStreamer  with  Real-time calculation
URL: https://github.com/apache/incubator-hudi/issues/615
 
 
   
   
   Hi, In my project, I want to  consume data from kafka topic and upsert data with hoodie. In hoodie, the HoodieDeltaStreamer is drived by spark context, which is a kind of batch program, it will exit after calculate one batch data. So I use **spark streaming context** to do that. the HoodieDeltaStreamer uses RDDs , but spark streaming context got DStream object from kafka topic with those code 
   
   ```
   private JavaDStream<GenericRecord> toDStream(OffsetRange[] offsetRanges) {
       String topicName = props.getString(KAFKA_TOPIC_NAME);
       JavaDStream<GenericRecord> recordStream = null;
   
       recordStream =  KafkaUtils.createDirectStream(javaStreamingContext, String.class, Object.class, StringDecoder.class, HoodieKafkaAvroDecoder.class,
               offsetGen.getKafkaParams(), new HashSet<>(Arrays.asList(topicName))).map(obj -> (GenericRecord) obj);
       return recordStream;
     }
   
   ```
   recordStream is a JavaDStream<GenericRecord> object, the HoodieDeltaStreamer needs a JavaRDD<GenericRecord> object , i have to convert JavaDStream<GenericRecord> into  JavaRDD<GenericRecord>.I refernece JavaDStream API, the compute(Time duration) method can **Generate an RDD for the given duration** , However, when my program executed to this step, 
   
   ```
   JavaDStream<GenericRecord> newDataStream = toDStream(offsetRanges);
   if (newDataStream == null) {
      throw new HoodieException("Cannot fetch new stream data!");
   }
   log.info("fetch new stream data");
   newDataStream.print();
   newDataRDD = newDataStream.compute(new Time(3000L));
   
   ```
   
   I got a exception
   
   ```
   org.apache.spark.SparkException: org.apache.spark.streaming.kafka.DirectKafkaInputDStream@62065d12 has not been initialized
           at org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:312)
           at org.apache.spark.streaming.dstream.InputDStream.isTimeValid(InputDStream.scala:88)
           at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
           at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
           at scala.Option.orElse(Option.scala:289)
           at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
           at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:36)
           at org.apache.spark.streaming.api.java.JavaDStream.compute(JavaDStream.scala:58)
           at com.uber.hoodie.utilities.deltastreamer.HoodieDeltaStreamer.createContext(HoodieDeltaStreamer.java:406)
           at com.uber.hoodie.utilities.deltastreamer.HoodieDeltaStreamer.lambda$new$b912b50a$1(HoodieDeltaStreamer.java:193)
           at org.apache.spark.streaming.api.java.JavaStreamingContext$$anonfun$7.apply(JavaStreamingContext.scala:627)
           at org.apache.spark.streaming.api.java.JavaStreamingContext$$anonfun$7.apply(JavaStreamingContext.scala:626)
           at scala.Option.getOrElse(Option.scala:121)
   ```
   
   because the zerotime value of DStream is always null, the compute() method will check if time valid or not.
   
   
   if I change HoodieDeltaStreamer with **flink**, I have created a  flink source to consume data from kafka topic. I donot know how to create flink sink with hoodie write client. 
   
   I have no idea about that, can you give some suggestions for me. thank you !

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services