You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Mich Talebzadeh <mi...@gmail.com> on 2018/07/29 08:38:50 UTC

not found: type CustomWatermarkEmitter

Hi,

I have written a simple test program as below

import java.util.Properties
import java.util.Arrays
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.apache.flink.streaming.util.serialization.DeserializationSchema
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.ConsumerRecords
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.flink.core.fs.FileSystem
import org.apache.flink.streaming.api
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import
org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector
import
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.connectors.kafka
object md_streaming
{
  private var zookeeperUrl = "rhes75:2181"
  private var requestConsumerId = null
  private var impressionConsumerId = null
  private var clickConsumerId = null
  private var conversionConsumerId = null
  private var requestTopicName = null
  private var impressionTopicName = null
  private var clickTopicName = null
  private var conversionTopicName = null
  private var requestThreads = 0
  private var impressionThreads = 0
  private var clickThreads = 0
  private var conversionThreads = 0
  private var flinkAppName = "md_streaming"
  private var bootstrapServers = "rhes75:9092, rhes75:9093, rhes75:9094"
  private var schemaRegistryURL = "http://rhes75:8081"
  private var zookeeperConnect = "rhes75:2181"
  private var zookeeperConnectionTimeoutMs = "10000"
  private var rebalanceBackoffMS = "15000"
  private var zookeeperSessionTimeOutMs = "15000"
  private var autoCommitIntervalMS = "12000"
  private var topicsValue = "final"
  private var memorySet = "F"
  private var enableHiveSupport = null
  private var enableHiveSupportValue = "true"
  private var sparkStreamingReceiverMaxRateValue = "0"
  private var checkpointdir = "/checkpoint"
  private var hbaseHost = "rhes75"
  private var zookeeperHost = "rhes564"
  private var zooKeeperClientPort = "2181"
  private var writeDirectory = "hdfs://rhes75:9000/tmp/flink/"
  private var fileName = "md_streaming.txt"
  private val maxServingDelay = 60
  private val servingSpeedFactor = 600f
  private var batchInterval = 2
  private val countWindowLength = 4 // window size in sec
  private val countWindowFrequency =  2   // window trigger interval in sec
  private val earlyCountThreshold = 50
  private val writeToElasticsearch = false // set to true to write results
to Elasticsearch
  private val elasticsearchHost = "" // look-up hostname in Elasticsearch
log output
  private val elasticsearchPort = 9300

  def main(args: Array[String])
  {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val properties = new Properties()
    properties.setProperty("bootstrap.servers", bootstrapServers)
    properties.setProperty("zookeeper.connect", zookeeperConnect)
    properties.setProperty("group.id", flinkAppName)
    properties.setProperty("auto.offset.reset", "latest")

    val myConsumer = env.addSource(new
FlinkKafkaConsumer011[String](topicsValue, new SimpleStringSchema(),
properties))

    myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter())
    env.addSource(myConsumer).print()
    //val sink = kafkaSource.writeAsText(writeDirectory+fileName,
FileSystem.WriteMode.OVERWRITE)
    //env.execute("Flink Kafka Example writing to "+writeDirectory+fileName)
    env.execute("Flink simple output")
  }
}

However, when compiling I am getting the following errors

[error]
/home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:98:
not found: type CustomWatermarkEmitter
[error]     myConsumer.assignTimestampsAndWatermarks(new
CustomWatermarkEmitter())
[error]                                                  ^
[error]
/home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:99:
type mismatch;
[error]  found   :
org.apache.flink.streaming.api.datastream.DataStreamSource[String]
[error]  required:
org.apache.flink.streaming.api.functions.source.SourceFunction[?]
[error]     env.addSource(myConsumer).print()
[error]                   ^
[error] two errors found
[error] (compile:compileIncremental) Compilation failed
[error] Total time: 3 s, completed Jul 29, 2018 9:31:05 AM
Completed compiling
Sun Jul 29 09:31:05 BST 2018 , Running in **Standalone mode**
Could not build the program from JAR file.

I don't see why it is failing. Appreciate any suggestions.

Regards,

Dr Mich Talebzadeh



LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.

Re: not found: type CustomWatermarkEmitter

Posted by Mich Talebzadeh <mi...@gmail.com>.
Thanks, I'll check them out.

Regards,

Dr Mich Talebzadeh



LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 30 Jul 2018 at 01:08, vino yang <ya...@gmail.com> wrote:

> Hi Mich,
>
> These two mistakes are obvious.
>
> 1): The compiler can not find the definition of CustomWatermarkEmitter.
> Did you define it? Or import the dependency if it defines in other place?
> 2): The type of variable "myCustomer"  is "DataStreamSource", but
> env.addSource method receive a source function. Actually, you have added
> the source with :
>
> val myConsumer = env.addSource(new FlinkKafkaConsumer011[String](topicsValue,
> new SimpleStringSchema(), properties))
>
> so, just :
>
> myConsumer.print()
>
> It should work.
>
> Thanks, vino.
>
>
> 2018-07-29 17:15 GMT+08:00 Renjie Liu <li...@gmail.com>:
>
>> It seems that it's related with your development environment settings.
>>
>> On Sun, Jul 29, 2018 at 4:39 PM Mich Talebzadeh <
>> mich.talebzadeh@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I have written a simple test program as below
>>>
>>> import java.util.Properties
>>> import java.util.Arrays
>>> import org.apache.flink.api.common.functions.MapFunction
>>> import org.apache.flink.api.java.utils.ParameterTool
>>> import org.apache.flink.streaming.api.datastream.DataStream
>>> import
>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
>>> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
>>> import org.apache.flink.streaming.util.serialization.SimpleStringSchema
>>> import
>>> org.apache.flink.streaming.util.serialization.DeserializationSchema
>>> import org.apache.flink.streaming.util.serialization.SimpleStringSchema
>>> import org.apache.kafka.clients.consumer.ConsumerConfig
>>> import org.apache.kafka.clients.consumer.ConsumerRecord
>>> import org.apache.kafka.clients.consumer.ConsumerRecords
>>> import org.apache.kafka.clients.consumer.KafkaConsumer
>>> import org.apache.flink.core.fs.FileSystem
>>> import org.apache.flink.streaming.api
>>> import org.apache.flink.streaming.api.TimeCharacteristic
>>> import org.apache.flink.streaming.api.datastream.DataStream
>>> import
>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
>>> import
>>> org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
>>> import org.apache.flink.streaming.api.watermark.Watermark
>>> import org.apache.flink.streaming.api.windowing.time.Time
>>> import org.apache.flink.util.Collector
>>> import
>>> org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
>>> import org.apache.flink.streaming.api.watermark.Watermark
>>> import org.apache.flink.streaming.api.functions.source.SourceFunction
>>> import org.apache.flink.streaming.connectors.kafka
>>> object md_streaming
>>> {
>>>   private var zookeeperUrl = "rhes75:2181"
>>>   private var requestConsumerId = null
>>>   private var impressionConsumerId = null
>>>   private var clickConsumerId = null
>>>   private var conversionConsumerId = null
>>>   private var requestTopicName = null
>>>   private var impressionTopicName = null
>>>   private var clickTopicName = null
>>>   private var conversionTopicName = null
>>>   private var requestThreads = 0
>>>   private var impressionThreads = 0
>>>   private var clickThreads = 0
>>>   private var conversionThreads = 0
>>>   private var flinkAppName = "md_streaming"
>>>   private var bootstrapServers = "rhes75:9092, rhes75:9093, rhes75:9094"
>>>   private var schemaRegistryURL = "http://rhes75:8081"
>>>   private var zookeeperConnect = "rhes75:2181"
>>>   private var zookeeperConnectionTimeoutMs = "10000"
>>>   private var rebalanceBackoffMS = "15000"
>>>   private var zookeeperSessionTimeOutMs = "15000"
>>>   private var autoCommitIntervalMS = "12000"
>>>   private var topicsValue = "final"
>>>   private var memorySet = "F"
>>>   private var enableHiveSupport = null
>>>   private var enableHiveSupportValue = "true"
>>>   private var sparkStreamingReceiverMaxRateValue = "0"
>>>   private var checkpointdir = "/checkpoint"
>>>   private var hbaseHost = "rhes75"
>>>   private var zookeeperHost = "rhes564"
>>>   private var zooKeeperClientPort = "2181"
>>>   private var writeDirectory = "hdfs://rhes75:9000/tmp/flink/"
>>>   private var fileName = "md_streaming.txt"
>>>   private val maxServingDelay = 60
>>>   private val servingSpeedFactor = 600f
>>>   private var batchInterval = 2
>>>   private val countWindowLength = 4 // window size in sec
>>>   private val countWindowFrequency =  2   // window trigger interval in
>>> sec
>>>   private val earlyCountThreshold = 50
>>>   private val writeToElasticsearch = false // set to true to write
>>> results to Elasticsearch
>>>   private val elasticsearchHost = "" // look-up hostname in
>>> Elasticsearch log output
>>>   private val elasticsearchPort = 9300
>>>
>>>   def main(args: Array[String])
>>>   {
>>>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>>>
>>>     val properties = new Properties()
>>>     properties.setProperty("bootstrap.servers", bootstrapServers)
>>>     properties.setProperty("zookeeper.connect", zookeeperConnect)
>>>     properties.setProperty("group.id", flinkAppName)
>>>     properties.setProperty("auto.offset.reset", "latest")
>>>
>>>     val myConsumer = env.addSource(new
>>> FlinkKafkaConsumer011[String](topicsValue, new SimpleStringSchema(),
>>> properties))
>>>
>>>     myConsumer.assignTimestampsAndWatermarks(new
>>> CustomWatermarkEmitter())
>>>     env.addSource(myConsumer).print()
>>>     //val sink = kafkaSource.writeAsText(writeDirectory+fileName,
>>> FileSystem.WriteMode.OVERWRITE)
>>>     //env.execute("Flink Kafka Example writing to
>>> "+writeDirectory+fileName)
>>>     env.execute("Flink simple output")
>>>   }
>>> }
>>>
>>> However, when compiling I am getting the following errors
>>>
>>> [error]
>>> /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:98:
>>> not found: type CustomWatermarkEmitter
>>> [error]     myConsumer.assignTimestampsAndWatermarks(new
>>> CustomWatermarkEmitter())
>>> [error]                                                  ^
>>> [error]
>>> /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:99:
>>> type mismatch;
>>> [error]  found   :
>>> org.apache.flink.streaming.api.datastream.DataStreamSource[String]
>>> [error]  required:
>>> org.apache.flink.streaming.api.functions.source.SourceFunction[?]
>>> [error]     env.addSource(myConsumer).print()
>>> [error]                   ^
>>> [error] two errors found
>>> [error] (compile:compileIncremental) Compilation failed
>>> [error] Total time: 3 s, completed Jul 29, 2018 9:31:05 AM
>>> Completed compiling
>>> Sun Jul 29 09:31:05 BST 2018 , Running in **Standalone mode**
>>> Could not build the program from JAR file.
>>>
>>> I don't see why it is failing. Appreciate any suggestions.
>>>
>>> Regards,
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>> --
>> Liu, Renjie
>> Software Engineer, MVAD
>>
>
>

Re: not found: type CustomWatermarkEmitter

Posted by vino yang <ya...@gmail.com>.
Hi Mich,

These two mistakes are obvious.

1): The compiler can not find the definition of CustomWatermarkEmitter. Did
you define it? Or import the dependency if it defines in other place?
2): The type of variable "myCustomer"  is "DataStreamSource", but
env.addSource method receive a source function. Actually, you have added
the source with :

val myConsumer = env.addSource(new FlinkKafkaConsumer011[String](topicsValue,
new SimpleStringSchema(), properties))

so, just :

myConsumer.print()

It should work.

Thanks, vino.


2018-07-29 17:15 GMT+08:00 Renjie Liu <li...@gmail.com>:

> It seems that it's related with your development environment settings.
>
> On Sun, Jul 29, 2018 at 4:39 PM Mich Talebzadeh <mi...@gmail.com>
> wrote:
>
>> Hi,
>>
>> I have written a simple test program as below
>>
>> import java.util.Properties
>> import java.util.Arrays
>> import org.apache.flink.api.common.functions.MapFunction
>> import org.apache.flink.api.java.utils.ParameterTool
>> import org.apache.flink.streaming.api.datastream.DataStream
>> import org.apache.flink.streaming.api.environment.
>> StreamExecutionEnvironment
>> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
>> import org.apache.flink.streaming.util.serialization.SimpleStringSchema
>> import org.apache.flink.streaming.util.serialization.
>> DeserializationSchema
>> import org.apache.flink.streaming.util.serialization.SimpleStringSchema
>> import org.apache.kafka.clients.consumer.ConsumerConfig
>> import org.apache.kafka.clients.consumer.ConsumerRecord
>> import org.apache.kafka.clients.consumer.ConsumerRecords
>> import org.apache.kafka.clients.consumer.KafkaConsumer
>> import org.apache.flink.core.fs.FileSystem
>> import org.apache.flink.streaming.api
>> import org.apache.flink.streaming.api.TimeCharacteristic
>> import org.apache.flink.streaming.api.datastream.DataStream
>> import org.apache.flink.streaming.api.environment.
>> StreamExecutionEnvironment
>> import org.apache.flink.streaming.api.functions.timestamps.
>> BoundedOutOfOrdernessTimestampExtractor
>> import org.apache.flink.streaming.api.watermark.Watermark
>> import org.apache.flink.streaming.api.windowing.time.Time
>> import org.apache.flink.util.Collector
>> import org.apache.flink.streaming.api.functions.
>> AssignerWithPunctuatedWatermarks
>> import org.apache.flink.streaming.api.watermark.Watermark
>> import org.apache.flink.streaming.api.functions.source.SourceFunction
>> import org.apache.flink.streaming.connectors.kafka
>> object md_streaming
>> {
>>   private var zookeeperUrl = "rhes75:2181"
>>   private var requestConsumerId = null
>>   private var impressionConsumerId = null
>>   private var clickConsumerId = null
>>   private var conversionConsumerId = null
>>   private var requestTopicName = null
>>   private var impressionTopicName = null
>>   private var clickTopicName = null
>>   private var conversionTopicName = null
>>   private var requestThreads = 0
>>   private var impressionThreads = 0
>>   private var clickThreads = 0
>>   private var conversionThreads = 0
>>   private var flinkAppName = "md_streaming"
>>   private var bootstrapServers = "rhes75:9092, rhes75:9093, rhes75:9094"
>>   private var schemaRegistryURL = "http://rhes75:8081"
>>   private var zookeeperConnect = "rhes75:2181"
>>   private var zookeeperConnectionTimeoutMs = "10000"
>>   private var rebalanceBackoffMS = "15000"
>>   private var zookeeperSessionTimeOutMs = "15000"
>>   private var autoCommitIntervalMS = "12000"
>>   private var topicsValue = "final"
>>   private var memorySet = "F"
>>   private var enableHiveSupport = null
>>   private var enableHiveSupportValue = "true"
>>   private var sparkStreamingReceiverMaxRateValue = "0"
>>   private var checkpointdir = "/checkpoint"
>>   private var hbaseHost = "rhes75"
>>   private var zookeeperHost = "rhes564"
>>   private var zooKeeperClientPort = "2181"
>>   private var writeDirectory = "hdfs://rhes75:9000/tmp/flink/"
>>   private var fileName = "md_streaming.txt"
>>   private val maxServingDelay = 60
>>   private val servingSpeedFactor = 600f
>>   private var batchInterval = 2
>>   private val countWindowLength = 4 // window size in sec
>>   private val countWindowFrequency =  2   // window trigger interval in
>> sec
>>   private val earlyCountThreshold = 50
>>   private val writeToElasticsearch = false // set to true to write
>> results to Elasticsearch
>>   private val elasticsearchHost = "" // look-up hostname in Elasticsearch
>> log output
>>   private val elasticsearchPort = 9300
>>
>>   def main(args: Array[String])
>>   {
>>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>>     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>>
>>     val properties = new Properties()
>>     properties.setProperty("bootstrap.servers", bootstrapServers)
>>     properties.setProperty("zookeeper.connect", zookeeperConnect)
>>     properties.setProperty("group.id", flinkAppName)
>>     properties.setProperty("auto.offset.reset", "latest")
>>
>>     val myConsumer = env.addSource(new FlinkKafkaConsumer011[String](topicsValue,
>> new SimpleStringSchema(), properties))
>>
>>     myConsumer.assignTimestampsAndWatermarks(new
>> CustomWatermarkEmitter())
>>     env.addSource(myConsumer).print()
>>     //val sink = kafkaSource.writeAsText(writeDirectory+fileName,
>> FileSystem.WriteMode.OVERWRITE)
>>     //env.execute("Flink Kafka Example writing to
>> "+writeDirectory+fileName)
>>     env.execute("Flink simple output")
>>   }
>> }
>>
>> However, when compiling I am getting the following errors
>>
>> [error] /home/hduser/dba/bin/flink/md_streaming/src/main/scala/
>> myPackage/md_streaming.scala:98: not found: type CustomWatermarkEmitter
>> [error]     myConsumer.assignTimestampsAndWatermarks(new
>> CustomWatermarkEmitter())
>> [error]                                                  ^
>> [error] /home/hduser/dba/bin/flink/md_streaming/src/main/scala/
>> myPackage/md_streaming.scala:99: type mismatch;
>> [error]  found   : org.apache.flink.streaming.api.datastream.
>> DataStreamSource[String]
>> [error]  required: org.apache.flink.streaming.api.functions.source.
>> SourceFunction[?]
>> [error]     env.addSource(myConsumer).print()
>> [error]                   ^
>> [error] two errors found
>> [error] (compile:compileIncremental) Compilation failed
>> [error] Total time: 3 s, completed Jul 29, 2018 9:31:05 AM
>> Completed compiling
>> Sun Jul 29 09:31:05 BST 2018 , Running in **Standalone mode**
>> Could not build the program from JAR file.
>>
>> I don't see why it is failing. Appreciate any suggestions.
>>
>> Regards,
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
> --
> Liu, Renjie
> Software Engineer, MVAD
>

Re: not found: type CustomWatermarkEmitter

Posted by Renjie Liu <li...@gmail.com>.
It seems that it's related with your development environment settings.

On Sun, Jul 29, 2018 at 4:39 PM Mich Talebzadeh <mi...@gmail.com>
wrote:

> Hi,
>
> I have written a simple test program as below
>
> import java.util.Properties
> import java.util.Arrays
> import org.apache.flink.api.common.functions.MapFunction
> import org.apache.flink.api.java.utils.ParameterTool
> import org.apache.flink.streaming.api.datastream.DataStream
> import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
> import org.apache.flink.streaming.util.serialization.SimpleStringSchema
> import org.apache.flink.streaming.util.serialization.DeserializationSchema
> import org.apache.flink.streaming.util.serialization.SimpleStringSchema
> import org.apache.kafka.clients.consumer.ConsumerConfig
> import org.apache.kafka.clients.consumer.ConsumerRecord
> import org.apache.kafka.clients.consumer.ConsumerRecords
> import org.apache.kafka.clients.consumer.KafkaConsumer
> import org.apache.flink.core.fs.FileSystem
> import org.apache.flink.streaming.api
> import org.apache.flink.streaming.api.TimeCharacteristic
> import org.apache.flink.streaming.api.datastream.DataStream
> import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
> import
> org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
> import org.apache.flink.streaming.api.watermark.Watermark
> import org.apache.flink.streaming.api.windowing.time.Time
> import org.apache.flink.util.Collector
> import
> org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
> import org.apache.flink.streaming.api.watermark.Watermark
> import org.apache.flink.streaming.api.functions.source.SourceFunction
> import org.apache.flink.streaming.connectors.kafka
> object md_streaming
> {
>   private var zookeeperUrl = "rhes75:2181"
>   private var requestConsumerId = null
>   private var impressionConsumerId = null
>   private var clickConsumerId = null
>   private var conversionConsumerId = null
>   private var requestTopicName = null
>   private var impressionTopicName = null
>   private var clickTopicName = null
>   private var conversionTopicName = null
>   private var requestThreads = 0
>   private var impressionThreads = 0
>   private var clickThreads = 0
>   private var conversionThreads = 0
>   private var flinkAppName = "md_streaming"
>   private var bootstrapServers = "rhes75:9092, rhes75:9093, rhes75:9094"
>   private var schemaRegistryURL = "http://rhes75:8081"
>   private var zookeeperConnect = "rhes75:2181"
>   private var zookeeperConnectionTimeoutMs = "10000"
>   private var rebalanceBackoffMS = "15000"
>   private var zookeeperSessionTimeOutMs = "15000"
>   private var autoCommitIntervalMS = "12000"
>   private var topicsValue = "final"
>   private var memorySet = "F"
>   private var enableHiveSupport = null
>   private var enableHiveSupportValue = "true"
>   private var sparkStreamingReceiverMaxRateValue = "0"
>   private var checkpointdir = "/checkpoint"
>   private var hbaseHost = "rhes75"
>   private var zookeeperHost = "rhes564"
>   private var zooKeeperClientPort = "2181"
>   private var writeDirectory = "hdfs://rhes75:9000/tmp/flink/"
>   private var fileName = "md_streaming.txt"
>   private val maxServingDelay = 60
>   private val servingSpeedFactor = 600f
>   private var batchInterval = 2
>   private val countWindowLength = 4 // window size in sec
>   private val countWindowFrequency =  2   // window trigger interval in sec
>   private val earlyCountThreshold = 50
>   private val writeToElasticsearch = false // set to true to write results
> to Elasticsearch
>   private val elasticsearchHost = "" // look-up hostname in Elasticsearch
> log output
>   private val elasticsearchPort = 9300
>
>   def main(args: Array[String])
>   {
>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>
>     val properties = new Properties()
>     properties.setProperty("bootstrap.servers", bootstrapServers)
>     properties.setProperty("zookeeper.connect", zookeeperConnect)
>     properties.setProperty("group.id", flinkAppName)
>     properties.setProperty("auto.offset.reset", "latest")
>
>     val myConsumer = env.addSource(new
> FlinkKafkaConsumer011[String](topicsValue, new SimpleStringSchema(),
> properties))
>
>     myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter())
>     env.addSource(myConsumer).print()
>     //val sink = kafkaSource.writeAsText(writeDirectory+fileName,
> FileSystem.WriteMode.OVERWRITE)
>     //env.execute("Flink Kafka Example writing to
> "+writeDirectory+fileName)
>     env.execute("Flink simple output")
>   }
> }
>
> However, when compiling I am getting the following errors
>
> [error]
> /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:98:
> not found: type CustomWatermarkEmitter
> [error]     myConsumer.assignTimestampsAndWatermarks(new
> CustomWatermarkEmitter())
> [error]                                                  ^
> [error]
> /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:99:
> type mismatch;
> [error]  found   :
> org.apache.flink.streaming.api.datastream.DataStreamSource[String]
> [error]  required:
> org.apache.flink.streaming.api.functions.source.SourceFunction[?]
> [error]     env.addSource(myConsumer).print()
> [error]                   ^
> [error] two errors found
> [error] (compile:compileIncremental) Compilation failed
> [error] Total time: 3 s, completed Jul 29, 2018 9:31:05 AM
> Completed compiling
> Sun Jul 29 09:31:05 BST 2018 , Running in **Standalone mode**
> Could not build the program from JAR file.
>
> I don't see why it is failing. Appreciate any suggestions.
>
> Regards,
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
-- 
Liu, Renjie
Software Engineer, MVAD