You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "Castro, Fernando C." <Fe...@leidos.com> on 2020/03/03 21:24:59 UTC

Should I use a Sink or Connector? Or Both?

Hello folks! I’m new to Flink and data streaming in general, just initial FYI ;)

I’m currently doing this successfully:
1 - streaming data from Kafka in Flink
2 - aggregating the data with Flink’s sqlQuery API
3 - outputting the result of #2 into STDOUT via toRetreatStream()

My objective is to change #3 so I’m upserting into an Elasticsearch index (see https://stackoverflow.com/questions/60512064/flink-is-not-adding-any-data-to-elasticsearch-but-no-errors for my complete code)

I’ve been using the template for the Elasticsearch connector https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#elasticsearch-connector
tableEnvironment
  .connect(...)
  .withFormat(...)
  .withSchema(...)
  .inAppendMode()
  .createTemporaryTable("MyTable")

By I’m confused from seeing some old examples online. Should I be using the Elasticsearch Sink (https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/elasticsearch.html#elasticsearch-sink) instead? Or both?

I’m having trouble with the current implementation where no data is outputting to Elasticsearch, but no error is being displayed in Flink (job status is RUNNING).

Hoping somebody could clarify what I’m missing? Thank you in advance!

Note: Running on Scala 2.11, Elasticsearch 7, Flink 1.10

Re: EXTERNAL: Re: Should I use a Sink or Connector? Or Both?

Posted by Jark Wu <im...@gmail.com>.
Hi Fernando,

Thanks for reporting back.
From my point of view, this is a short-comming of current elasticsearch
connector, i.e. out-of-box doesn't work.
I created FLINK-16495 [1] to improve this to have a default flush interval.

Best,
Jark

[1]: https://issues.apache.org/jira/browse/FLINK-16495

On Sat, 7 Mar 2020 at 00:20, Castro, Fernando C. <Fe...@leidos.com>
wrote:

> Arvid, thank you that was it!
>
> After setting these properties to my Elasticsearch connector, I was able
> to see the records upserting into ES!
>
>
> .bulkFlushMaxActions(2)
> .bulkFlushInterval(1000L)
>
>
>
> Thank you,
>
> Fernando
>
>
>
>
>
> *From: *Arvid Heise <ar...@ververica.com>
> *Date: *Thursday, March 5, 2020 at 2:27 AM
> *To: *"Castro, Fernando C. [US-US]" <Fe...@leidos.com>
> *Cc: *Jark Wu <im...@gmail.com>, John Smith <ja...@gmail.com>, "
> user@flink.apache.org" <us...@flink.apache.org>
> *Subject: *Re: EXTERNAL: Re: Should I use a Sink or Connector? Or Both?
>
>
>
> Hi Fernando,
>
>
>
> How much data are you trying to write? If you just use single messages for
> testing, it could be that the default bulk settings are not working well.
>
>
>
> If so, could you please adjust the following settings and report back?
>
> public enum SinkOption {
>    *BULK_FLUSH_MAX_ACTIONS*,
>    *BULK_FLUSH_MAX_SIZE*,
>    *BULK_FLUSH_INTERVAL*
> }
>
>
>
> On Wed, Mar 4, 2020 at 3:05 PM Castro, Fernando C. <
> Fernando.Castro@leidos.com> wrote:
>
> Thank you guys. So I have no idea of why data is not being pushed to
> Elasticsearch… ☹
>
>
>
> My complete code is at
> https://stackoverflow.com/questions/60512064/flink-is-not-adding-any-data-to-elasticsearch-but-no-errors
>
> Btw, for some reason I still need to pass .documentType to the
> Elasticsearch connection descriptor (getting it from
> org.apache.flink.table.descriptors.Elasticsearch), when Elasticsearch 7
> doesn’t do types anymore.
>
>
>
> In case you can’t access stackoverflow for some reason, here is the code
> below too:
>
>
>
>
> */* * This Scala source file was generated by the Gradle 'init' task. */ **package
> *flinkNamePull
>
> *import *java.time.LocalDateTime
> *import *java.util.Properties
>
> *import *org.apache.flink.api.common.serialization.SimpleStringSchema
> *import *org.apache.flink.streaming.api.scala._
> *import *org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer010,
> FlinkKafkaProducer010}
> *import *org.apache.flink.api.common.functions.RichMapFunction
> *import *org.apache.flink.configuration.Configuration
> *import *org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
> *import *org.apache.flink.table.api.{DataTypes, Table}
> *import *org.apache.flink.table.api.scala.StreamTableEnvironment
> *import *org.apache.flink.table.descriptors.{Elasticsearch, Json, Schema}
> *import *org.apache.flink.types.Row
>
> *object *Demo {
>
>
>
> */**    * MapFunction to generate Transfers POJOs from parsed CSV data.
> */   **class *TransfersMapper *extends *RichMapFunction[String,
> Transfers] {
>     *private var *formatter =
>
> *null     *@throws[Exception]
>     *override def *open(parameters: Configuration): Unit = {
>       *super*.open(parameters)
>
> *//formatter = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")     *}
>
>     @throws[Exception]
>     *override def *map(csvLine: String): Transfers = {
>
> *//var splitCsv = csvLine.stripLineEnd.split("\n")(1).split(",")       **var
> *splitCsv = csvLine.stripLineEnd.split(*","*)
>
>       *val *arrLength = splitCsv.length
>       *val *i = 0
>       *if *(arrLength != 13) {
>         *for *(i <- arrLength + 1 to 13) {
>           *if *(i == 13) {
>             splitCsv = splitCsv :+
> *"0.0"           *} *else *{
>             splitCsv = splitCsv :+
> *""           *}
>         }
>       }
>       *var *trans = *new *Transfers()
>       trans.*rowId *= splitCsv(0)
>       trans.*subjectId *= splitCsv(1)
>       trans.*hadmId *= splitCsv(2)
>       trans.*icuStayId *= splitCsv(3)
>       trans.*dbSource *= splitCsv(4)
>       trans.*eventType *= splitCsv(5)
>       trans.*prev_careUnit *= splitCsv(6)
>       trans.*curr_careUnit *= splitCsv(7)
>       trans.*prev_wardId *= splitCsv(8)
>       trans.*curr_wardId *= splitCsv(9)
>       trans.*inTime *= splitCsv(10)
>       trans.*outTime *= splitCsv(11)
>       trans.*los *= splitCsv(12).toDouble
>
>       *return *trans
>     }
>   }
>
>   *def *main(args: Array[String]) {
>
> *// Create streaming execution environment     **val *env =
> StreamExecutionEnvironment.
> *getExecutionEnvironment     *env.setParallelism(1)
>
>
> *// Set properties per KafkaConsumer API     **val *properties = *new *
> Properties()
>     properties.setProperty(*"bootstrap.servers"*, *"kafka.kafka:9092"*)
>     properties.setProperty(*"group.id <http://group.id>"*, *"test"*)
>
>
> *// Add Kafka source to environment     **val *myKConsumer = *new *
> FlinkKafkaConsumer010[String](*"raw.data3"*, *new *SimpleStringSchema(),
> properties)
>
> *// Read from beginning of topic     *myKConsumer.setStartFromEarliest()
>
>     *val *streamSource = env
>       .addSource(myKConsumer)
>
>
> *// Transform CSV into a Transfers object     **val *streamTransfers =
> streamSource.map(*new *TransfersMapper())
>
>
> *// create a TableEnvironment     **val *tEnv = StreamTableEnvironment.
> *create*(env)
>
>
> *// register a Table     **val *tblTransfers: Table =
> tEnv.fromDataStream(streamTransfers)
>     tEnv.createTemporaryView(*"transfers"*, tblTransfers)
>
>     tEnv.connect(
>       *new *Elasticsearch()
>         .version(*"7"*)
>         .host(
> *"elasticsearch-elasticsearch-coordinating-only.default.svc.cluster.local"*,
> 9200, *"http"*)
> *// required: one or more Elasticsearch hosts to connect to         *
> .index(*"transfers-sum"*)
>         .documentType(*"_doc"*) *// not sure why this is still needed for
> ES7*
>         .keyNullLiteral(*"n/a"*)
>     )
>       .withFormat(*new *Json().jsonSchema(*"{type: 'object', properties:
> {curr_careUnit: {type: 'string'}, sum: {type: 'number'}}}"*))
>       .withSchema(*new *Schema()
>         .field(*"curr_careUnit"*, DataTypes.*STRING*())
>         .field(*"sum"*, DataTypes.*DOUBLE*())
>       )
>       .inUpsertMode()
>       .createTemporaryTable(*"transfersSum"*)
>
>     *val *result = tEnv.sqlQuery(
>
>
>
>
> *"""         |SELECT curr_careUnit, sum(los)         |FROM transfers
>         |GROUP BY curr_careUnit         |"""*.stripMargin)
>
>     result.insertInto(*"transfersSum"*)
>
>     tEnv.toRetractStream[Row](result).print()
>
> *//Just to see if something is actually happening (and it is)     *
> env.execute(*"Flink Streaming Demo Dump to Elasticsearch"*)
>   }
> }
>
>
>
>
>
> Thank you,
>
> Fernando
>
>
>
>
>
> *From: *Jark Wu <im...@gmail.com>
> *Date: *Tuesday, March 3, 2020 at 8:51 PM
> *To: *John Smith <ja...@gmail.com>
> *Cc: *"Castro, Fernando C. [US-US]" <Fe...@leidos.com>, "
> user@flink.apache.org" <us...@flink.apache.org>
> *Subject: *EXTERNAL: Re: Should I use a Sink or Connector? Or Both?
>
>
>
> John is right.
>
>
>
> Could you provide more detailed code? So that we can help to investigate.
>
>
>
> Best,
>
> Jark
>
>
>
> On Wed, 4 Mar 2020 at 06:20, John Smith <ja...@gmail.com> wrote:
>
> The sink if for Streaming API, it looks like you are using SQL and tables.
> So you can use the connector to output the table result to Elastic. Unless
> you want to convert from table to stream first.
>
>
>
> On Tue, 3 Mar 2020 at 16:25, Castro, Fernando C. <
> Fernando.Castro@leidos.com> wrote:
>
> Hello folks! I’m new to Flink and data streaming in general, just initial
> FYI ;)
>
>
>
> I’m currently doing this successfully:
>
> 1 - streaming data from Kafka in Flink
>
> 2 - aggregating the data with Flink’s sqlQuery API
>
> 3 - outputting the result of #2 into STDOUT via toRetreatStream()
>
>
>
> My objective is to change #3 so I’m upserting into an Elasticsearch index
> (see
> https://stackoverflow.com/questions/60512064/flink-is-not-adding-any-data-to-elasticsearch-but-no-errors
> for my complete code)
>
>
>
> I’ve been using the template for the Elasticsearch connector
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#elasticsearch-connector
>
> tableEnvironment
>
>   .connect(...)
>
>   .withFormat(...)
>
>   .withSchema(...)
>
>   .inAppendMode()
>
>   .createTemporaryTable("MyTable")
>
>
>
> By I’m confused from seeing some old examples online. Should I be using
> the Elasticsearch Sink (
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/elasticsearch.html#elasticsearch-sink)
> instead? Or both?
>
>
>
> I’m having trouble with the current implementation where no data is
> outputting to Elasticsearch, but no error is being displayed in Flink (job
> status is RUNNING).
>
>
>
> Hoping somebody could clarify what I’m missing? Thank you in advance!
>
>
>
> Note: Running on Scala 2.11, Elasticsearch 7, Flink 1.10
>
>

Re: EXTERNAL: Re: Should I use a Sink or Connector? Or Both?

Posted by "Castro, Fernando C." <Fe...@leidos.com>.
Arvid, thank you that was it!
After setting these properties to my Elasticsearch connector, I was able to see the records upserting into ES!

.bulkFlushMaxActions(2)
.bulkFlushInterval(1000L)

Thank you,
Fernando


From: Arvid Heise <ar...@ververica.com>
Date: Thursday, March 5, 2020 at 2:27 AM
To: "Castro, Fernando C. [US-US]" <Fe...@leidos.com>
Cc: Jark Wu <im...@gmail.com>, John Smith <ja...@gmail.com>, "user@flink.apache.org" <us...@flink.apache.org>
Subject: Re: EXTERNAL: Re: Should I use a Sink or Connector? Or Both?

Hi Fernando,

How much data are you trying to write? If you just use single messages for testing, it could be that the default bulk settings are not working well.

If so, could you please adjust the following settings and report back?

public enum SinkOption {
   BULK_FLUSH_MAX_ACTIONS,
   BULK_FLUSH_MAX_SIZE,
   BULK_FLUSH_INTERVAL
}

On Wed, Mar 4, 2020 at 3:05 PM Castro, Fernando C. <Fe...@leidos.com>> wrote:
Thank you guys. So I have no idea of why data is not being pushed to Elasticsearch… ☹

My complete code is at https://stackoverflow.com/questions/60512064/flink-is-not-adding-any-data-to-elasticsearch-but-no-errors
Btw, for some reason I still need to pass .documentType to the Elasticsearch connection descriptor (getting it from org.apache.flink.table.descriptors.Elasticsearch), when Elasticsearch 7 doesn’t do types anymore.

In case you can’t access stackoverflow for some reason, here is the code below too:
/*
* This Scala source file was generated by the Gradle 'init' task.
*/
package flinkNamePull

import java.time.LocalDateTime
import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer010, FlinkKafkaProducer010}
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.{DataTypes, Table}
import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.apache.flink.table.descriptors.{Elasticsearch, Json, Schema}
import org.apache.flink.types.Row

object Demo {
  /**
   * MapFunction to generate Transfers POJOs from parsed CSV data.
   */
  class TransfersMapper extends RichMapFunction[String, Transfers] {
    private var formatter = null

    @throws[Exception]
    override def open(parameters: Configuration): Unit = {
      super.open(parameters)
      //formatter = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")
    }

    @throws[Exception]
    override def map(csvLine: String): Transfers = {
      //var splitCsv = csvLine.stripLineEnd.split("\n")(1).split(",")
      var splitCsv = csvLine.stripLineEnd.split(",")

      val arrLength = splitCsv.length
      val i = 0
      if (arrLength != 13) {
        for (i <- arrLength + 1 to 13) {
          if (i == 13) {
            splitCsv = splitCsv :+ "0.0"
          } else {
            splitCsv = splitCsv :+ ""
          }
        }
      }
      var trans = new Transfers()
      trans.rowId = splitCsv(0)
      trans.subjectId = splitCsv(1)
      trans.hadmId = splitCsv(2)
      trans.icuStayId = splitCsv(3)
      trans.dbSource = splitCsv(4)
      trans.eventType = splitCsv(5)
      trans.prev_careUnit = splitCsv(6)
      trans.curr_careUnit = splitCsv(7)
      trans.prev_wardId = splitCsv(8)
      trans.curr_wardId = splitCsv(9)
      trans.inTime = splitCsv(10)
      trans.outTime = splitCsv(11)
      trans.los = splitCsv(12).toDouble

      return trans
    }
  }

  def main(args: Array[String]) {
    // Create streaming execution environment
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    // Set properties per KafkaConsumer API
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "kafka.kafka:9092")
    properties.setProperty("group.id<http://group.id>", "test")

    // Add Kafka source to environment
    val myKConsumer = new FlinkKafkaConsumer010[String]("raw.data3", new SimpleStringSchema(), properties)
    // Read from beginning of topic
    myKConsumer.setStartFromEarliest()

    val streamSource = env
      .addSource(myKConsumer)

    // Transform CSV into a Transfers object
    val streamTransfers = streamSource.map(new TransfersMapper())

    // create a TableEnvironment
    val tEnv = StreamTableEnvironment.create(env)

    // register a Table
    val tblTransfers: Table = tEnv.fromDataStream(streamTransfers)
    tEnv.createTemporaryView("transfers", tblTransfers)

    tEnv.connect(
      new Elasticsearch()
        .version("7")
        .host("elasticsearch-elasticsearch-coordinating-only.default.svc.cluster.local", 9200, "http")   // required: one or more Elasticsearch hosts to connect to
        .index("transfers-sum")
        .documentType("_doc") // not sure why this is still needed for ES7
        .keyNullLiteral("n/a")
    )
      .withFormat(new Json().jsonSchema("{type: 'object', properties: {curr_careUnit: {type: 'string'}, sum: {type: 'number'}}}"))
      .withSchema(new Schema()
        .field("curr_careUnit", DataTypes.STRING())
        .field("sum", DataTypes.DOUBLE())
      )
      .inUpsertMode()
      .createTemporaryTable("transfersSum")

    val result = tEnv.sqlQuery(
      """
        |SELECT curr_careUnit, sum(los)
        |FROM transfers
        |GROUP BY curr_careUnit
        |""".stripMargin)

    result.insertInto("transfersSum")

    tEnv.toRetractStream[Row](result).print() //Just to see if something is actually happening (and it is)

    env.execute("Flink Streaming Demo Dump to Elasticsearch")
  }
}


Thank you,
Fernando


From: Jark Wu <im...@gmail.com>>
Date: Tuesday, March 3, 2020 at 8:51 PM
To: John Smith <ja...@gmail.com>>
Cc: "Castro, Fernando C. [US-US]" <Fe...@leidos.com>>, "user@flink.apache.org<ma...@flink.apache.org>" <us...@flink.apache.org>>
Subject: EXTERNAL: Re: Should I use a Sink or Connector? Or Both?

John is right.

Could you provide more detailed code? So that we can help to investigate.

Best,
Jark

On Wed, 4 Mar 2020 at 06:20, John Smith <ja...@gmail.com>> wrote:
The sink if for Streaming API, it looks like you are using SQL and tables. So you can use the connector to output the table result to Elastic. Unless you want to convert from table to stream first.

On Tue, 3 Mar 2020 at 16:25, Castro, Fernando C. <Fe...@leidos.com>> wrote:
Hello folks! I’m new to Flink and data streaming in general, just initial FYI ;)

I’m currently doing this successfully:
1 - streaming data from Kafka in Flink
2 - aggregating the data with Flink’s sqlQuery API
3 - outputting the result of #2 into STDOUT via toRetreatStream()

My objective is to change #3 so I’m upserting into an Elasticsearch index (see https://stackoverflow.com/questions/60512064/flink-is-not-adding-any-data-to-elasticsearch-but-no-errors for my complete code)

I’ve been using the template for the Elasticsearch connector https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#elasticsearch-connector
tableEnvironment
  .connect(...)
  .withFormat(...)
  .withSchema(...)
  .inAppendMode()
  .createTemporaryTable("MyTable")

By I’m confused from seeing some old examples online. Should I be using the Elasticsearch Sink (https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/elasticsearch.html#elasticsearch-sink) instead? Or both?

I’m having trouble with the current implementation where no data is outputting to Elasticsearch, but no error is being displayed in Flink (job status is RUNNING).

Hoping somebody could clarify what I’m missing? Thank you in advance!

Note: Running on Scala 2.11, Elasticsearch 7, Flink 1.10

Re: EXTERNAL: Re: Should I use a Sink or Connector? Or Both?

Posted by Arvid Heise <ar...@ververica.com>.
Hi Fernando,

How much data are you trying to write? If you just use single messages for
testing, it could be that the default bulk settings are not working well.

If so, could you please adjust the following settings and report back?

public enum SinkOption {
   BULK_FLUSH_MAX_ACTIONS,
   BULK_FLUSH_MAX_SIZE,
   BULK_FLUSH_INTERVAL
}


On Wed, Mar 4, 2020 at 3:05 PM Castro, Fernando C. <
Fernando.Castro@leidos.com> wrote:

> Thank you guys. So I have no idea of why data is not being pushed to
> Elasticsearch… ☹
>
>
>
> My complete code is at
> https://stackoverflow.com/questions/60512064/flink-is-not-adding-any-data-to-elasticsearch-but-no-errors
>
> Btw, for some reason I still need to pass .documentType to the
> Elasticsearch connection descriptor (getting it from
> org.apache.flink.table.descriptors.Elasticsearch), when Elasticsearch 7
> doesn’t do types anymore.
>
>
>
> In case you can’t access stackoverflow for some reason, here is the code
> below too:
>
>
>
>
> */* * This Scala source file was generated by the Gradle 'init' task. */ **package
> *flinkNamePull
>
> *import *java.time.LocalDateTime
> *import *java.util.Properties
>
> *import *org.apache.flink.api.common.serialization.SimpleStringSchema
> *import *org.apache.flink.streaming.api.scala._
> *import *org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer010,
> FlinkKafkaProducer010}
> *import *org.apache.flink.api.common.functions.RichMapFunction
> *import *org.apache.flink.configuration.Configuration
> *import *org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
> *import *org.apache.flink.table.api.{DataTypes, Table}
> *import *org.apache.flink.table.api.scala.StreamTableEnvironment
> *import *org.apache.flink.table.descriptors.{Elasticsearch, Json, Schema}
> *import *org.apache.flink.types.Row
>
> *object *Demo {
>
>
>
> */**    * MapFunction to generate Transfers POJOs from parsed CSV data.
> */   **class *TransfersMapper *extends *RichMapFunction[String,
> Transfers] {
>     *private var *formatter =
>
> *null     *@throws[Exception]
>     *override def *open(parameters: Configuration): Unit = {
>       *super*.open(parameters)
>
> *//formatter = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")     *}
>
>     @throws[Exception]
>     *override def *map(csvLine: String): Transfers = {
>
> *//var splitCsv = csvLine.stripLineEnd.split("\n")(1).split(",")       **var
> *splitCsv = csvLine.stripLineEnd.split(*","*)
>
>       *val *arrLength = splitCsv.length
>       *val *i = 0
>       *if *(arrLength != 13) {
>         *for *(i <- arrLength + 1 to 13) {
>           *if *(i == 13) {
>             splitCsv = splitCsv :+
> *"0.0"           *} *else *{
>             splitCsv = splitCsv :+
> *""           *}
>         }
>       }
>       *var *trans = *new *Transfers()
>       trans.*rowId *= splitCsv(0)
>       trans.*subjectId *= splitCsv(1)
>       trans.*hadmId *= splitCsv(2)
>       trans.*icuStayId *= splitCsv(3)
>       trans.*dbSource *= splitCsv(4)
>       trans.*eventType *= splitCsv(5)
>       trans.*prev_careUnit *= splitCsv(6)
>       trans.*curr_careUnit *= splitCsv(7)
>       trans.*prev_wardId *= splitCsv(8)
>       trans.*curr_wardId *= splitCsv(9)
>       trans.*inTime *= splitCsv(10)
>       trans.*outTime *= splitCsv(11)
>       trans.*los *= splitCsv(12).toDouble
>
>       *return *trans
>     }
>   }
>
>   *def *main(args: Array[String]) {
>
> *// Create streaming execution environment     **val *env =
> StreamExecutionEnvironment.
> *getExecutionEnvironment     *env.setParallelism(1)
>
>
> *// Set properties per KafkaConsumer API     **val *properties = *new *
> Properties()
>     properties.setProperty(*"bootstrap.servers"*, *"kafka.kafka:9092"*)
>     properties.setProperty(*"group.id <http://group.id>"*, *"test"*)
>
>
> *// Add Kafka source to environment     **val *myKConsumer = *new *
> FlinkKafkaConsumer010[String](*"raw.data3"*, *new *SimpleStringSchema(),
> properties)
>
> *// Read from beginning of topic     *myKConsumer.setStartFromEarliest()
>
>     *val *streamSource = env
>       .addSource(myKConsumer)
>
>
> *// Transform CSV into a Transfers object     **val *streamTransfers =
> streamSource.map(*new *TransfersMapper())
>
>
> *// create a TableEnvironment     **val *tEnv = StreamTableEnvironment.
> *create*(env)
>
>
> *// register a Table     **val *tblTransfers: Table =
> tEnv.fromDataStream(streamTransfers)
>     tEnv.createTemporaryView(*"transfers"*, tblTransfers)
>
>     tEnv.connect(
>       *new *Elasticsearch()
>         .version(*"7"*)
>         .host(
> *"elasticsearch-elasticsearch-coordinating-only.default.svc.cluster.local"*,
> 9200, *"http"*)
> *// required: one or more Elasticsearch hosts to connect to         *
> .index(*"transfers-sum"*)
>         .documentType(*"_doc"*) *// not sure why this is still needed for
> ES7*
>         .keyNullLiteral(*"n/a"*)
>     )
>       .withFormat(*new *Json().jsonSchema(*"{type: 'object', properties:
> {curr_careUnit: {type: 'string'}, sum: {type: 'number'}}}"*))
>       .withSchema(*new *Schema()
>         .field(*"curr_careUnit"*, DataTypes.*STRING*())
>         .field(*"sum"*, DataTypes.*DOUBLE*())
>       )
>       .inUpsertMode()
>       .createTemporaryTable(*"transfersSum"*)
>
>     *val *result = tEnv.sqlQuery(
>
>
>
>
> *"""         |SELECT curr_careUnit, sum(los)         |FROM transfers
>         |GROUP BY curr_careUnit         |"""*.stripMargin)
>
>     result.insertInto(*"transfersSum"*)
>
>     tEnv.toRetractStream[Row](result).print()
>
> *//Just to see if something is actually happening (and it is)     *
> env.execute(*"Flink Streaming Demo Dump to Elasticsearch"*)
>   }
> }
>
>
>
>
>
> Thank you,
>
> Fernando
>
>
>
>
>
> *From: *Jark Wu <im...@gmail.com>
> *Date: *Tuesday, March 3, 2020 at 8:51 PM
> *To: *John Smith <ja...@gmail.com>
> *Cc: *"Castro, Fernando C. [US-US]" <Fe...@leidos.com>, "
> user@flink.apache.org" <us...@flink.apache.org>
> *Subject: *EXTERNAL: Re: Should I use a Sink or Connector? Or Both?
>
>
>
> John is right.
>
>
>
> Could you provide more detailed code? So that we can help to investigate.
>
>
>
> Best,
>
> Jark
>
>
>
> On Wed, 4 Mar 2020 at 06:20, John Smith <ja...@gmail.com> wrote:
>
> The sink if for Streaming API, it looks like you are using SQL and tables.
> So you can use the connector to output the table result to Elastic. Unless
> you want to convert from table to stream first.
>
>
>
> On Tue, 3 Mar 2020 at 16:25, Castro, Fernando C. <
> Fernando.Castro@leidos.com> wrote:
>
> Hello folks! I’m new to Flink and data streaming in general, just initial
> FYI ;)
>
>
>
> I’m currently doing this successfully:
>
> 1 - streaming data from Kafka in Flink
>
> 2 - aggregating the data with Flink’s sqlQuery API
>
> 3 - outputting the result of #2 into STDOUT via toRetreatStream()
>
>
>
> My objective is to change #3 so I’m upserting into an Elasticsearch index
> (see
> https://stackoverflow.com/questions/60512064/flink-is-not-adding-any-data-to-elasticsearch-but-no-errors
> for my complete code)
>
>
>
> I’ve been using the template for the Elasticsearch connector
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#elasticsearch-connector
>
> tableEnvironment
>
>   .connect(...)
>
>   .withFormat(...)
>
>   .withSchema(...)
>
>   .inAppendMode()
>
>   .createTemporaryTable("MyTable")
>
>
>
> By I’m confused from seeing some old examples online. Should I be using
> the Elasticsearch Sink (
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/elasticsearch.html#elasticsearch-sink)
> instead? Or both?
>
>
>
> I’m having trouble with the current implementation where no data is
> outputting to Elasticsearch, but no error is being displayed in Flink (job
> status is RUNNING).
>
>
>
> Hoping somebody could clarify what I’m missing? Thank you in advance!
>
>
>
> Note: Running on Scala 2.11, Elasticsearch 7, Flink 1.10
>
>

Re: EXTERNAL: Re: Should I use a Sink or Connector? Or Both?

Posted by "Castro, Fernando C." <Fe...@leidos.com>.
Thank you guys. So I have no idea of why data is not being pushed to Elasticsearch… ☹

My complete code is at https://stackoverflow.com/questions/60512064/flink-is-not-adding-any-data-to-elasticsearch-but-no-errors
Btw, for some reason I still need to pass .documentType to the Elasticsearch connection descriptor (getting it from org.apache.flink.table.descriptors.Elasticsearch), when Elasticsearch 7 doesn’t do types anymore.

In case you can’t access stackoverflow for some reason, here is the code below too:
/*
* This Scala source file was generated by the Gradle 'init' task.
*/
package flinkNamePull

import java.time.LocalDateTime
import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer010, FlinkKafkaProducer010}
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.{DataTypes, Table}
import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.apache.flink.table.descriptors.{Elasticsearch, Json, Schema}
import org.apache.flink.types.Row

object Demo {
  /**
   * MapFunction to generate Transfers POJOs from parsed CSV data.
   */
  class TransfersMapper extends RichMapFunction[String, Transfers] {
    private var formatter = null

    @throws[Exception]
    override def open(parameters: Configuration): Unit = {
      super.open(parameters)
      //formatter = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")
    }

    @throws[Exception]
    override def map(csvLine: String): Transfers = {
      //var splitCsv = csvLine.stripLineEnd.split("\n")(1).split(",")
      var splitCsv = csvLine.stripLineEnd.split(",")

      val arrLength = splitCsv.length
      val i = 0
      if (arrLength != 13) {
        for (i <- arrLength + 1 to 13) {
          if (i == 13) {
            splitCsv = splitCsv :+ "0.0"
          } else {
            splitCsv = splitCsv :+ ""
          }
        }
      }
      var trans = new Transfers()
      trans.rowId = splitCsv(0)
      trans.subjectId = splitCsv(1)
      trans.hadmId = splitCsv(2)
      trans.icuStayId = splitCsv(3)
      trans.dbSource = splitCsv(4)
      trans.eventType = splitCsv(5)
      trans.prev_careUnit = splitCsv(6)
      trans.curr_careUnit = splitCsv(7)
      trans.prev_wardId = splitCsv(8)
      trans.curr_wardId = splitCsv(9)
      trans.inTime = splitCsv(10)
      trans.outTime = splitCsv(11)
      trans.los = splitCsv(12).toDouble

      return trans
    }
  }

  def main(args: Array[String]) {
    // Create streaming execution environment
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    // Set properties per KafkaConsumer API
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "kafka.kafka:9092")
    properties.setProperty("group.id", "test")

    // Add Kafka source to environment
    val myKConsumer = new FlinkKafkaConsumer010[String]("raw.data3", new SimpleStringSchema(), properties)
    // Read from beginning of topic
    myKConsumer.setStartFromEarliest()

    val streamSource = env
      .addSource(myKConsumer)

    // Transform CSV into a Transfers object
    val streamTransfers = streamSource.map(new TransfersMapper())

    // create a TableEnvironment
    val tEnv = StreamTableEnvironment.create(env)

    // register a Table
    val tblTransfers: Table = tEnv.fromDataStream(streamTransfers)
    tEnv.createTemporaryView("transfers", tblTransfers)

    tEnv.connect(
      new Elasticsearch()
        .version("7")
        .host("elasticsearch-elasticsearch-coordinating-only.default.svc.cluster.local", 9200, "http")   // required: one or more Elasticsearch hosts to connect to
        .index("transfers-sum")
        .documentType("_doc") // not sure why this is still needed for ES7
        .keyNullLiteral("n/a")
    )
      .withFormat(new Json().jsonSchema("{type: 'object', properties: {curr_careUnit: {type: 'string'}, sum: {type: 'number'}}}"))
      .withSchema(new Schema()
        .field("curr_careUnit", DataTypes.STRING())
        .field("sum", DataTypes.DOUBLE())
      )
      .inUpsertMode()
      .createTemporaryTable("transfersSum")

    val result = tEnv.sqlQuery(
      """
        |SELECT curr_careUnit, sum(los)
        |FROM transfers
        |GROUP BY curr_careUnit
        |""".stripMargin)

    result.insertInto("transfersSum")

    tEnv.toRetractStream[Row](result).print() //Just to see if something is actually happening (and it is)

    env.execute("Flink Streaming Demo Dump to Elasticsearch")
  }
}


Thank you,
Fernando


From: Jark Wu <im...@gmail.com>
Date: Tuesday, March 3, 2020 at 8:51 PM
To: John Smith <ja...@gmail.com>
Cc: "Castro, Fernando C. [US-US]" <Fe...@leidos.com>, "user@flink.apache.org" <us...@flink.apache.org>
Subject: EXTERNAL: Re: Should I use a Sink or Connector? Or Both?

John is right.

Could you provide more detailed code? So that we can help to investigate.

Best,
Jark

On Wed, 4 Mar 2020 at 06:20, John Smith <ja...@gmail.com>> wrote:
The sink if for Streaming API, it looks like you are using SQL and tables. So you can use the connector to output the table result to Elastic. Unless you want to convert from table to stream first.

On Tue, 3 Mar 2020 at 16:25, Castro, Fernando C. <Fe...@leidos.com>> wrote:
Hello folks! I’m new to Flink and data streaming in general, just initial FYI ;)

I’m currently doing this successfully:
1 - streaming data from Kafka in Flink
2 - aggregating the data with Flink’s sqlQuery API
3 - outputting the result of #2 into STDOUT via toRetreatStream()

My objective is to change #3 so I’m upserting into an Elasticsearch index (see https://stackoverflow.com/questions/60512064/flink-is-not-adding-any-data-to-elasticsearch-but-no-errors for my complete code)

I’ve been using the template for the Elasticsearch connector https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#elasticsearch-connector
tableEnvironment
  .connect(...)
  .withFormat(...)
  .withSchema(...)
  .inAppendMode()
  .createTemporaryTable("MyTable")

By I’m confused from seeing some old examples online. Should I be using the Elasticsearch Sink (https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/elasticsearch.html#elasticsearch-sink) instead? Or both?

I’m having trouble with the current implementation where no data is outputting to Elasticsearch, but no error is being displayed in Flink (job status is RUNNING).

Hoping somebody could clarify what I’m missing? Thank you in advance!

Note: Running on Scala 2.11, Elasticsearch 7, Flink 1.10

Re: Should I use a Sink or Connector? Or Both?

Posted by Jark Wu <im...@gmail.com>.
John is right.

Could you provide more detailed code? So that we can help to investigate.

Best,
Jark

On Wed, 4 Mar 2020 at 06:20, John Smith <ja...@gmail.com> wrote:

> The sink if for Streaming API, it looks like you are using SQL and tables.
> So you can use the connector to output the table result to Elastic. Unless
> you want to convert from table to stream first.
>
> On Tue, 3 Mar 2020 at 16:25, Castro, Fernando C. <
> Fernando.Castro@leidos.com> wrote:
>
>> Hello folks! I’m new to Flink and data streaming in general, just initial
>> FYI ;)
>>
>>
>>
>> I’m currently doing this successfully:
>>
>> 1 - streaming data from Kafka in Flink
>>
>> 2 - aggregating the data with Flink’s sqlQuery API
>>
>> 3 - outputting the result of #2 into STDOUT via toRetreatStream()
>>
>>
>>
>> My objective is to change #3 so I’m upserting into an Elasticsearch index
>> (see
>> https://stackoverflow.com/questions/60512064/flink-is-not-adding-any-data-to-elasticsearch-but-no-errors
>> for my complete code)
>>
>>
>>
>> I’ve been using the template for the Elasticsearch connector
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#elasticsearch-connector
>>
>> tableEnvironment
>>
>>   .connect(...)
>>
>>   .withFormat(...)
>>
>>   .withSchema(...)
>>
>>   .inAppendMode()
>>
>>   .createTemporaryTable("MyTable")
>>
>>
>>
>> By I’m confused from seeing some old examples online. Should I be using
>> the Elasticsearch Sink (
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/elasticsearch.html#elasticsearch-sink)
>> instead? Or both?
>>
>>
>>
>> I’m having trouble with the current implementation where no data is
>> outputting to Elasticsearch, but no error is being displayed in Flink (job
>> status is RUNNING).
>>
>>
>>
>> Hoping somebody could clarify what I’m missing? Thank you in advance!
>>
>>
>>
>> Note: Running on Scala 2.11, Elasticsearch 7, Flink 1.10
>>
>

Re: Should I use a Sink or Connector? Or Both?

Posted by John Smith <ja...@gmail.com>.
The sink if for Streaming API, it looks like you are using SQL and tables.
So you can use the connector to output the table result to Elastic. Unless
you want to convert from table to stream first.

On Tue, 3 Mar 2020 at 16:25, Castro, Fernando C. <Fe...@leidos.com>
wrote:

> Hello folks! I’m new to Flink and data streaming in general, just initial
> FYI ;)
>
>
>
> I’m currently doing this successfully:
>
> 1 - streaming data from Kafka in Flink
>
> 2 - aggregating the data with Flink’s sqlQuery API
>
> 3 - outputting the result of #2 into STDOUT via toRetreatStream()
>
>
>
> My objective is to change #3 so I’m upserting into an Elasticsearch index
> (see
> https://stackoverflow.com/questions/60512064/flink-is-not-adding-any-data-to-elasticsearch-but-no-errors
> for my complete code)
>
>
>
> I’ve been using the template for the Elasticsearch connector
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#elasticsearch-connector
>
> tableEnvironment
>
>   .connect(...)
>
>   .withFormat(...)
>
>   .withSchema(...)
>
>   .inAppendMode()
>
>   .createTemporaryTable("MyTable")
>
>
>
> By I’m confused from seeing some old examples online. Should I be using
> the Elasticsearch Sink (
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/elasticsearch.html#elasticsearch-sink)
> instead? Or both?
>
>
>
> I’m having trouble with the current implementation where no data is
> outputting to Elasticsearch, but no error is being displayed in Flink (job
> status is RUNNING).
>
>
>
> Hoping somebody could clarify what I’m missing? Thank you in advance!
>
>
>
> Note: Running on Scala 2.11, Elasticsearch 7, Flink 1.10
>