You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Rion Williams <ri...@gmail.com> on 2021/08/08 14:23:59 UTC

Dynamic Cluster/Index Routing for Elasticsearch Sink

Hi folks,

I have a use-case that I wanted to initially pose to the mailing list as I’m not terribly familiar with the Elasticsearch connector to ensure I’m not going down the wrong path trying to accomplish this in Flink (or if something downstream might be a better option).

Basically, I have the following pieces to the puzzle:
A stream of tenant-specific events
An HTTP endpoint containing mappings for tenant-specific Elastic cluster information (as each tenant has its own specific Elastic cluster/index)
What I’m hoping to accomplish is the following:
One stream will periodically poll the HTTP endpoint and store these cluster mappings in state (keyed by tenant with cluster info as the value)
The event stream will be keyed by tenant and connected to the cluster mappings stream.
I’ll need to an Elasticsearch sink that can route the tenant-specific event data to its corresponding cluster/index from the mapping source.
I know that the existing Elasticsearch sink supports dynamic indices, however I didn’t know if it’s possible to adjust the cluster like I would need on a per-tenant basis or if there’s a better approach here? 

Any advice would be appreciated.

Thanks,

Rion

Re: Dynamic Cluster/Index Routing for Elasticsearch Sink

Posted by Arvid Heise <ar...@apache.org>.
Hi Rion,

Thank you very much for the contribution. We are currently still busy with
the 1.14 release but can pick up the review after that. David, if you have
capacity, we would be grateful for any help.

In general, we plan to port ES from SinkFunction to the new Sink interface
in 1.15 but I don't see a reason to let that interfere with the PR.

On Mon, Sep 6, 2021 at 9:29 AM David Morávek <dm...@apache.org> wrote:

> Hi Rion,
>
> thanks for opening the PR. I'll take a look at it this week. I'd also pull
> Arvid into this topic to see whether he has any comments.
>
> Best,
> D.
>
> On Sat, Sep 4, 2021 at 9:10 PM Rion Williams <ri...@gmail.com>
> wrote:
>
>> Hi again David et al,
>>
>> I managed to push an initial pull request for the implementations for
>> the DynamicElasticsearchSink and related ElasticsearchSinkRouter last week
>> <https://github.com/apache/flink/pull/17061> and made some minor updates
>> today with regards to the Javadocs (included code examples, etc.) along
>> with a few tests that came to mind. I was hoping to get a few more eyes on
>> it and figure out what else might be worth adding/changing/documenting in
>> hopes of wrapping this feature up.
>>
>> Thanks again to everyone in this incredible community for their
>> assistance with this, a local implementation of it for a project of mine is
>> working like a charm, so I'm hoping it's something that others will be able
>> to leverage for their own needs.
>>
>> Rion
>>
>> On Thu, Aug 26, 2021 at 11:45 AM David Morávek <dm...@apache.org> wrote:
>>
>>> Hi Rion,
>>>
>>> personally I'd start with unit test in the base module using a test sink
>>> implementation. There is already *DummyElasticsearchSink* that you may
>>> be able to reuse (just note that we're trying to get rid of Mockito based
>>> tests such as this one).
>>>
>>> I'm bit unsure that integration test would actually test anything extra
>>> that the unit test doesn't in this case, so I'd recommend it as the next
>>> step (I'm also bit concerned that this test would take a long time to
>>> execute / be resource intensive as it would need to spawn more elastic
>>> clusters?).
>>>
>>> Best,
>>> D.
>>>
>>> On Thu, Aug 26, 2021 at 5:47 PM Rion Williams <ri...@gmail.com>
>>> wrote:
>>>
>>>> Just chiming in on this again.
>>>>
>>>> I think I have the pieces in place regarding the implementation (both a
>>>> DynamicElasticsearchSink class and ElasticsearchSinkRouter interface) added
>>>> to the elasticsearch-base module. I noticed that HttpHost wasn't available
>>>> within that module/in the tests, so I'd suspect that I'd need to add a
>>>> dependency similar to those found within the specific ES implementations
>>>> (5/6/7). I'd also assume that it may be best to just provide a dummy sink
>>>> similar to the other patterning to handle writing the unit tests or would
>>>> you recommend separate Elasticsearch integration tests using a
>>>> TestContainer of each supported version (5/6/7) similar to those within the
>>>> ElasticsearchSinkITCase files under each module?
>>>>
>>>> Any advice / recommendations on this front would be helpful. I want to
>>>> write some tests surrounding this that demonstrate the most common
>>>> use-cases, but also don't want to go overkill.
>>>>
>>>> Thanks again for all of your help,
>>>>
>>>> Rion
>>>>
>>>> On Wed, Aug 25, 2021 at 2:10 PM Rion Williams <ri...@gmail.com>
>>>> wrote:
>>>>
>>>>> Thanks again David,
>>>>>
>>>>> I've spun up a JIRA issue for the ticket
>>>>> <https://issues.apache.org/jira/browse/FLINK-23977> while I work on
>>>>> getting things into the proper state. If someone with the
>>>>> appropriate privileges could assign it to me, I'd be appreciative. I'll
>>>>> likely need some assistance at a few points to ensure things look as
>>>>> expected, but I'm happy to help with this contribution.
>>>>>
>>>>> Rion
>>>>>
>>>>> On Wed, Aug 25, 2021 at 11:37 AM David Morávek <dm...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> AFAIK there are currently no other sources in Flink that can treat
>>>>>> "other sources" / "destination" as data. Most complete generic work on this
>>>>>> topic that I'm aware of are Splittable DoFn based IOs in Apache Beam.
>>>>>>
>>>>>> I think the best module for the contribution would be
>>>>>> "elasticsearch-base", because this could be easily reused for all ES
>>>>>> versions that we currently support.
>>>>>>
>>>>>> Best,
>>>>>> D.
>>>>>>
>>>>>> On Wed, Aug 25, 2021 at 4:58 PM Rion Williams <ri...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi David,
>>>>>>>
>>>>>>> That was perfect and it looks like this is working as I'd expected.
>>>>>>> I put together some larger integration tests for my specific use-case
>>>>>>> (multiple Elasticsearch clusters running in TestContainers) and verified
>>>>>>> that messages were being routed dynamically to the appropriate sinks. I
>>>>>>> forked the Flink repo last night and was trying to figure out the best
>>>>>>> place to start adding these classes in (I noticed that there were three
>>>>>>> separate ES packages targeting 5/6/7 respectively). I was going to try to
>>>>>>> start fleshing the initial implementation for this, but wanted to make sure
>>>>>>> that I was starting in the right place.
>>>>>>>
>>>>>>> Additionally, do you know of anything that might be similar to this
>>>>>>> even within other sinks? Just trying to think of something to model this
>>>>>>> after. Once I get things started, I'll spin up a JIRA issue for it and go
>>>>>>> from there.
>>>>>>>
>>>>>>> Thanks so much for your help!
>>>>>>>
>>>>>>> Rion
>>>>>>>
>>>>>>> On Tue, Aug 24, 2021 at 1:45 AM David Morávek <dm...@apache.org>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Rion,
>>>>>>>>
>>>>>>>> you just need to call *sink.setRuntimeContext(getRuntimeContext())*
>>>>>>>> before opening the child sink. Please see *AbstractRichFunction*
>>>>>>>> [1] (that EleasticsearchSink extends) for more details.
>>>>>>>>
>>>>>>>> One more note, instead of starting with integration test, I'd
>>>>>>>> recommend writing a unit test using *operator test harness* [2]
>>>>>>>> first. This should help you to discover / debug many issues upfront. You
>>>>>>>> can use *ElasticsearchSinkBaseTest* [3] as an example.
>>>>>>>>
>>>>>>>> [1]
>>>>>>>> https://github.com/apache/flink/blob/release-1.13.2/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractRichFunction.java#L52
>>>>>>>> [2]
>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/testing/#unit-testing-stateful-or-timely-udfs--custom-operators
>>>>>>>> [3]
>>>>>>>> https://github.com/apache/flink/blob/release-1.13.2/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> D.
>>>>>>>>
>>>>>>>> On Tue, Aug 24, 2021 at 12:03 AM Rion Williams <
>>>>>>>> rionmonster@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi David,
>>>>>>>>>
>>>>>>>>> Thanks again for the response, I believe that I'm getting pretty
>>>>>>>>> close for at least a POC-level implementation of this. Currently, I'm
>>>>>>>>> working with JsonObject instances throughout the pipeline, so I wanted to
>>>>>>>>> try this out and simply stored the routing information within the element
>>>>>>>>> itself for simplicity's sake right now, so it has a shape that looks
>>>>>>>>> something like this:
>>>>>>>>>
>>>>>>>>> {
>>>>>>>>>     "route": {
>>>>>>>>>         "hosts": "...",
>>>>>>>>>         "index": "...",
>>>>>>>>>         ...
>>>>>>>>>     },
>>>>>>>>>     "all-other-fields-here"
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> And I've stripped back several of the layers of the routers (since
>>>>>>>>> I already have all of the information in the element at that point). I
>>>>>>>>> tried using something like this:
>>>>>>>>>
>>>>>>>>> class DynamicElasticsearchSink: RichSinkFunction<JsonObject>(), CheckpointedFunction {
>>>>>>>>>     private val sinkRoutes: MutableMap<String, ElasticsearchSink<JsonObject>> = ConcurrentHashMap()
>>>>>>>>>     private lateinit var configuration: Configuration
>>>>>>>>>
>>>>>>>>>     override fun open(parameters: Configuration) {
>>>>>>>>>         configuration = parameters
>>>>>>>>>     }
>>>>>>>>>
>>>>>>>>>     override fun invoke(element: JsonObject, context: SinkFunction.Context) {
>>>>>>>>>         val route = getHost(element)
>>>>>>>>>         // Check if we already have a router for this cluster
>>>>>>>>>         var sink = sinkRoutes[route]
>>>>>>>>>         if (sink == null) {
>>>>>>>>>             // If not, create one
>>>>>>>>>             sink = buildSinkFromRoute(element)
>>>>>>>>>             sink.open(configuration)
>>>>>>>>>             sinkRoutes[route] = sink
>>>>>>>>>         }
>>>>>>>>>
>>>>>>>>>         sink.invoke(element, context)
>>>>>>>>>     }
>>>>>>>>>
>>>>>>>>>     override fun initializeState(context: FunctionInitializationContext) {
>>>>>>>>>         // No-op.
>>>>>>>>>     }
>>>>>>>>>
>>>>>>>>>     override fun snapshotState(context: FunctionSnapshotContext) {
>>>>>>>>>         // This is used only to flush pending writes.
>>>>>>>>>         for (sink in sinkRoutes.values) {
>>>>>>>>>             sink.snapshotState(context)
>>>>>>>>>         }
>>>>>>>>>     }
>>>>>>>>>
>>>>>>>>>     override fun close() {
>>>>>>>>>         for (sink in sinkRoutes.values) {
>>>>>>>>>             sink.close()
>>>>>>>>>         }
>>>>>>>>>     }
>>>>>>>>>
>>>>>>>>>     private fun buildSinkFromRoute(element: JsonObject, ho): ElasticsearchSink<JsonObject> {
>>>>>>>>>         val builder = ElasticsearchSink.Builder<JsonObject>(
>>>>>>>>>             buildHostsFromElement(element),
>>>>>>>>>             ElasticsearchRoutingFunction()
>>>>>>>>>         )
>>>>>>>>>
>>>>>>>>>         builder.setBulkFlushMaxActions(1)
>>>>>>>>>
>>>>>>>>>         // TODO: Configure authorization if available
>>>>>>>>> //        builder.setRestClientFactory { restClient ->
>>>>>>>>> //            restClient.setHttpClientConfigCallback(object : RestClientBuilder.HttpClientConfigCallback {
>>>>>>>>> //                override fun customizeHttpClient(builder: HttpAsyncClientBuilder): HttpAsyncClientBuilder {
>>>>>>>>> //                    // Configure authorization here
>>>>>>>>> //                    val credentialsProvider = BasicCredentialsProvider().apply {
>>>>>>>>> //                        setCredentials(
>>>>>>>>> //                            AuthScope.ANY,
>>>>>>>>> //                            UsernamePasswordCredentials("$USERNAME", "$PASSWORD")
>>>>>>>>> //                        )
>>>>>>>>> //                    }
>>>>>>>>> //
>>>>>>>>> //                    return builder.setDefaultCredentialsProvider(credentialsProvider);
>>>>>>>>> //                }
>>>>>>>>> //            })
>>>>>>>>> //        }
>>>>>>>>>
>>>>>>>>>         return builder.build()
>>>>>>>>>     }
>>>>>>>>>
>>>>>>>>>     private fun buildHostsFromElement(element: JsonObject): List<HttpHost>{
>>>>>>>>>         val transportAddresses = element
>>>>>>>>>             .get("route").asJsonObject
>>>>>>>>>             .get("hosts").asString
>>>>>>>>>
>>>>>>>>>         // If there are multiple, they should be comma-delimited
>>>>>>>>>         val addresses = transportAddresses.split(",")
>>>>>>>>>         return addresses
>>>>>>>>>             .filter { address -> address.isNotEmpty() }
>>>>>>>>>             .map { address ->
>>>>>>>>>                 HttpHost.create(address)
>>>>>>>>>             }
>>>>>>>>>     }
>>>>>>>>>
>>>>>>>>>     private fun getHost(element: JsonObject): String {
>>>>>>>>>         return element
>>>>>>>>>             .get("route").asJsonObject
>>>>>>>>>             .get("hosts").asString
>>>>>>>>>     }
>>>>>>>>>
>>>>>>>>>     private class ElasticsearchRoutingFunction: ElasticsearchSinkFunction<JsonObject> {
>>>>>>>>>         override fun process(element: JsonObject, context: RuntimeContext, indexer: RequestIndexer) {
>>>>>>>>>             indexer.add(request(element))
>>>>>>>>>         }
>>>>>>>>>
>>>>>>>>>         private fun request(element: JsonObject): IndexRequest {
>>>>>>>>>             // Access routing information
>>>>>>>>>             val index = element
>>>>>>>>>                 .get("route").asJsonObject
>>>>>>>>>                 .get("index").asString
>>>>>>>>>
>>>>>>>>>             // Strip off routing information
>>>>>>>>>             element.remove("route")
>>>>>>>>>
>>>>>>>>>             // Send the request
>>>>>>>>>             return Requests.indexRequest()
>>>>>>>>>                 .index(index)
>>>>>>>>>                 .type("_doc")
>>>>>>>>>                 .source(mapOf(
>>>>>>>>>                     "data" to "$element"
>>>>>>>>>                 ))
>>>>>>>>>         }
>>>>>>>>>     }
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> After running an integration test, I keep encountering running
>>>>>>>>> into the following error during the invocation of the child sink:
>>>>>>>>>
>>>>>>>>> // The runtime context has not been initialized.
>>>>>>>>> sink.invoke(element, context)
>>>>>>>>>
>>>>>>>>> I can see the underlying sink getting initialized, the open call
>>>>>>>>> being made, etc. however for some reason it looks like there's an issue
>>>>>>>>> related to the context during the invoke call namely* "The
>>>>>>>>> runtime context has not been initialized". *I had assumed this
>>>>>>>>> would be alright since the context for the "wrapper" should have already
>>>>>>>>> been initialized, but maybe there's something that I'm missing.
>>>>>>>>>
>>>>>>>>> Also, please forgive any hastily written or nasty code as this is
>>>>>>>>> purely a POC to see if I could get this to work using a single object. I
>>>>>>>>> have the hopes of cleaning it up and genericizing it after I am confident
>>>>>>>>> that it actually works.
>>>>>>>>>
>>>>>>>>> Thanks so much again,
>>>>>>>>>
>>>>>>>>> Rion
>>>>>>>>>
>>>>>>>>> On Mon, Aug 23, 2021 at 11:12 AM David Morávek <dm...@apache.org>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Rion,
>>>>>>>>>>
>>>>>>>>>> Sorry for late reply, I've missed your previous message. Thanks
>>>>>>>>>> Arvid for the reminder <3.
>>>>>>>>>>
>>>>>>>>>> something like a MessageWrapper<ElementT, ConfigurationT> and
>>>>>>>>>>> pass those elements to the sink, which would create the tenant-specific
>>>>>>>>>>> Elastic connection from the ConfigurationT element and handle
>>>>>>>>>>> caching it and then just grab the element and send it on it's way?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Yes, this is exactly what I had in mind. There should be almost
>>>>>>>>>> no overhead as sink can be easily chained with your join
>>>>>>>>>> (KeyedCoProcessFunction) function.
>>>>>>>>>>
>>>>>>>>>>    -
>>>>>>>>>>    -
>>>>>>>>>>>
>>>>>>>>>>>    The shape of the elements being evicted from the process
>>>>>>>>>>>    function (Is a simple wrapper with the configuration for the sink enough
>>>>>>>>>>>    here? Do I need to explicitly initialize the sink within this function?
>>>>>>>>>>>    Etc.)
>>>>>>>>>>
>>>>>>>>>>    -
>>>>>>>>>>    - To write an element you need a configuration for the
>>>>>>>>>>    destination and the element itself, so a tuple of *(ElasticConfiguration,
>>>>>>>>>>    Element)* should be enough (that's basically your MessageWrapper<ElementT,
>>>>>>>>>>    ConfigurationT> class).
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>    -
>>>>>>>>>>    -
>>>>>>>>>>>
>>>>>>>>>>>    The actual use of the *DynamicElasticsearchSink* class
>>>>>>>>>>>    (Would it just be something like an *.addSink(*
>>>>>>>>>>>    *DynamicElasticSearchSink<**String, Configuration>())* or
>>>>>>>>>>>    perhaps something else entirely?)
>>>>>>>>>>
>>>>>>>>>>    -
>>>>>>>>>>
>>>>>>>>>> I guess it could look something like the snippet below. It would
>>>>>>>>>> be definitely good to play around with the
>>>>>>>>>> *DynamicElasticSearchSink* API and make it more meaningful /
>>>>>>>>>> user friendly (the gist I've shared was just a very rough prototype to
>>>>>>>>>> showcase the idea).
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>    - static class Destination {
>>>>>>>>>>
>>>>>>>>>>        private final List<HttpHost> httpHosts;
>>>>>>>>>>
>>>>>>>>>>        Destination(List<HttpHost> httpHosts) {
>>>>>>>>>>            this.httpHosts = httpHosts;
>>>>>>>>>>        }
>>>>>>>>>>    }
>>>>>>>>>>    -
>>>>>>>>>>    - final DataStream<Tuple2<Destination, String>> toWrite = ...;
>>>>>>>>>>    toWrite.addSink(
>>>>>>>>>>            new DynamicElasticsearchSink<>(
>>>>>>>>>>                    new SinkRouter<
>>>>>>>>>>                            Tuple2<Destination, String>,
>>>>>>>>>>                            String,
>>>>>>>>>>                            ElasticsearchSink<Tuple2<Destination,
>>>>>>>>>>    String>>>() {
>>>>>>>>>>
>>>>>>>>>>                        @Override
>>>>>>>>>>                        public String
>>>>>>>>>>    getRoute(Tuple2<Destination, String> element) {
>>>>>>>>>>    -                         // Construct a deterministic unique
>>>>>>>>>>    caching key for the destination... (this could be cheaper if you know the
>>>>>>>>>>    data)
>>>>>>>>>>                            return element.f0.httpHosts.stream()
>>>>>>>>>>                                    .map(HttpHost::toHostString)
>>>>>>>>>>
>>>>>>>>>>    .collect(Collectors.joining(","));
>>>>>>>>>>                        }
>>>>>>>>>>
>>>>>>>>>>                        @Override
>>>>>>>>>>                        public
>>>>>>>>>>    ElasticsearchSink<Tuple2<Destination, String>> createSink(
>>>>>>>>>>                                String cacheKey,
>>>>>>>>>>    Tuple2<Destination, String> element) {
>>>>>>>>>>                            return new
>>>>>>>>>>    ElasticsearchSink.Builder<>(
>>>>>>>>>>                                            element.f0.httpHosts,
>>>>>>>>>>
>>>>>>>>>>    (ElasticsearchSinkFunction<
>>>>>>>>>>
>>>>>>>>>>    Tuple2<Destination, String>>)
>>>>>>>>>>                                                    (el, ctx,
>>>>>>>>>>    indexer) -> {
>>>>>>>>>>                                                        //
>>>>>>>>>>    Construct index request.
>>>>>>>>>>                                                        final
>>>>>>>>>>    IndexRequest request = ...;
>>>>>>>>>>
>>>>>>>>>>    indexer.add(request);
>>>>>>>>>>                                                    })
>>>>>>>>>>                                    .build();
>>>>>>>>>>                        }
>>>>>>>>>>                    }));
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> I hope this helps ;)
>>>>>>>>>>
>>>>>>>>>> Best,
>>>>>>>>>> D.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Mon, Aug 16, 2021 at 5:18 PM Rion Williams <
>>>>>>>>>> rionmonster@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Thanks for this suggestion David, it's extremely helpful.
>>>>>>>>>>>
>>>>>>>>>>> Since this will vary depending on the elements retrieved from a
>>>>>>>>>>> separate stream, I'm guessing something like the following would be
>>>>>>>>>>> roughly the avenue to continue down:
>>>>>>>>>>>
>>>>>>>>>>> fun main(args: Array<String>) {
>>>>>>>>>>>     val parameters = mergeParametersFromProperties(args)
>>>>>>>>>>>     val stream = StreamExecutionEnvironment.getExecutionEnvironment()
>>>>>>>>>>>
>>>>>>>>>>>     // Get the stream for tenant-specific Elastic configurations
>>>>>>>>>>>     val connectionStream = stream
>>>>>>>>>>>         .fromSource(
>>>>>>>>>>>             KafkaSource.of(parameters, listOf("elastic-configs")),
>>>>>>>>>>>             WatermarkStrategy.noWatermarks(),
>>>>>>>>>>>             "elastic-configs"
>>>>>>>>>>>         )
>>>>>>>>>>>
>>>>>>>>>>>     // Get the stream of incoming messages to be routed to Elastic
>>>>>>>>>>>     stream
>>>>>>>>>>>         .fromSource(
>>>>>>>>>>>             KafkaSource.of(parameters, listOf("messages")),
>>>>>>>>>>>             WatermarkStrategy.noWatermarks(),
>>>>>>>>>>>             "messages"
>>>>>>>>>>>         )
>>>>>>>>>>>         .keyBy { message ->
>>>>>>>>>>>             // Key by the tenant in the message
>>>>>>>>>>>             message.getTenant()
>>>>>>>>>>>         }
>>>>>>>>>>>         .connect(
>>>>>>>>>>>             // Connect the messages stream with the configurations
>>>>>>>>>>>             connectionStream
>>>>>>>>>>>         )
>>>>>>>>>>>         .process(object : KeyedCoProcessFunction<String, String, String, String>() {
>>>>>>>>>>>             // For this key, we need to store all of the previous messages in state
>>>>>>>>>>>             // in the case where we don't have a given mapping for this tenant yet
>>>>>>>>>>>             lateinit var messagesAwaitingConfigState: ListState<String>
>>>>>>>>>>>             lateinit var configState: ValueState<String>
>>>>>>>>>>>
>>>>>>>>>>>             override fun open(parameters: Configuration) {
>>>>>>>>>>>                 super.open(parameters)
>>>>>>>>>>>                 // Initialize the states
>>>>>>>>>>>                 messagesAwaitingConfigState = runtimeContext.getListState(awaitingStateDesc)
>>>>>>>>>>>                 configState = runtimeContext.getState(configStateDesc)
>>>>>>>>>>>             }
>>>>>>>>>>>
>>>>>>>>>>>             // When an element is received
>>>>>>>>>>>             override fun processElement1(message: String, context: Context, out: Collector<String>) {
>>>>>>>>>>>                 // Check if we have a mapping
>>>>>>>>>>>                 if (configState.value() == null){
>>>>>>>>>>>                     // We don't have a mapping for this tenant, store messages until we get it
>>>>>>>>>>>                     messagesAwaitingConfigState.add(message)
>>>>>>>>>>>                 }
>>>>>>>>>>>                 else {
>>>>>>>>>>>                     // Output the record with some indicator of the route?
>>>>>>>>>>>                     out.collect(message)
>>>>>>>>>>>                 }
>>>>>>>>>>>             }
>>>>>>>>>>>
>>>>>>>>>>>             override fun processElement2(config: String, context: Context, out: Collector<String>) {
>>>>>>>>>>>                 // If this mapping is for this specific tenant, store it and flush the pending
>>>>>>>>>>>                 // records in state
>>>>>>>>>>>                 if (config.getTenant() == context.currentKey){
>>>>>>>>>>>                     configState.update(config)
>>>>>>>>>>>                     val messagesToFlush = messagesAwaitingConfigState.get()
>>>>>>>>>>>                     messagesToFlush.forEach { message ->
>>>>>>>>>>>                         out.collect(message)
>>>>>>>>>>>                     }
>>>>>>>>>>>                 }
>>>>>>>>>>>             }
>>>>>>>>>>>
>>>>>>>>>>>             // State descriptors
>>>>>>>>>>>             val awaitingStateDesc = ListStateDescriptor(
>>>>>>>>>>>                 "messages-awaiting-config",
>>>>>>>>>>>                 TypeInformation.of(String::class.java)
>>>>>>>>>>>             )
>>>>>>>>>>>
>>>>>>>>>>>             val configStateDesc = ValueStateDescriptor(
>>>>>>>>>>>                 "elastic-config",
>>>>>>>>>>>                 TypeInformation.of(String::class.java)
>>>>>>>>>>>             )
>>>>>>>>>>>         })
>>>>>>>>>>>
>>>>>>>>>>>     stream.executeAsync("$APPLICATION_NAME-job")
>>>>>>>>>>> }
>>>>>>>>>>>
>>>>>>>>>>> Basically, connect my tenant-specific configuration stream with
>>>>>>>>>>> my incoming messages (keyed by tenant) and buffer them until I have a
>>>>>>>>>>> corresponding configuration (to avoid race-conditions). However, I'm
>>>>>>>>>>> guessing what will happen here is rather than directly outputting the
>>>>>>>>>>> messages from this process function, I'd construct some type of wrapper
>>>>>>>>>>> here with the necessary routing/configuration for the message (obtained via
>>>>>>>>>>> the configuration stream) along with the element, which might be something
>>>>>>>>>>> like a MessageWrapper<ElementT, ConfigurationT> and pass those
>>>>>>>>>>> elements to the sink, which would create the tenant-specific Elastic
>>>>>>>>>>> connection from the ConfigurationT element and handle caching
>>>>>>>>>>> it and then just grab the element and send it on it's way?
>>>>>>>>>>>
>>>>>>>>>>> Those are really the only bits I'm stuck on at the moment:
>>>>>>>>>>>
>>>>>>>>>>>    1. The shape of the elements being evicted from the process
>>>>>>>>>>>    function (Is a simple wrapper with the configuration for the sink enough
>>>>>>>>>>>    here? Do I need to explicitly initialize the sink within this function?
>>>>>>>>>>>    Etc.)
>>>>>>>>>>>    2. The actual use of the DynamicElasticsearchSink class
>>>>>>>>>>>    (Would it just be something like an .addSink(DynamicElasticSearchSink<String,
>>>>>>>>>>>    Configuration>()) or perhaps something else entirely?)
>>>>>>>>>>>
>>>>>>>>>>> Thanks again so much for the advice thus far David, it's greatly
>>>>>>>>>>> appreciated.
>>>>>>>>>>>
>>>>>>>>>>> Rion
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Aug 13, 2021 at 9:04 AM David Morávek <dm...@apache.org>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> To give you a better idea, in high-level I think could look
>>>>>>>>>>>> something like this
>>>>>>>>>>>> <https://gist.github.com/dmvk/3f8124d585cd33e52cacd4a38b80f8c8>
>>>>>>>>>>>> [1].
>>>>>>>>>>>>
>>>>>>>>>>>> [1]
>>>>>>>>>>>> https://gist.github.com/dmvk/3f8124d585cd33e52cacd4a38b80f8c8
>>>>>>>>>>>>
>>>>>>>>>>>> On Fri, Aug 13, 2021 at 2:57 PM Rion Williams <
>>>>>>>>>>>> rionmonster@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi David,
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks for your response! I think there are currently quite a
>>>>>>>>>>>>> few unknowns in my end in terms of what a production loads look like but I
>>>>>>>>>>>>> think the number of clusters shouldn’t be too large (and will either rarely
>>>>>>>>>>>>> change or have new entries come in at runtime, but it needs to support
>>>>>>>>>>>>> that).
>>>>>>>>>>>>>
>>>>>>>>>>>>> I think the dynamic approach might be a good route to explore
>>>>>>>>>>>>> with actual changes to the Elasticsearch sink as a longer term option. I’m
>>>>>>>>>>>>> not sure what the dynamic one would look like at the moment though, perhaps
>>>>>>>>>>>>> that’s something you’d be able to advise on?
>>>>>>>>>>>>>
>>>>>>>>>>>>> Given that all the records are keyed for a given tenant and I
>>>>>>>>>>>>> would have the mappings stored in state, is it possible that within the
>>>>>>>>>>>>> open() function for this dynamic route to access the state and initialize
>>>>>>>>>>>>> the client there? Or maybe there’s some other approach (such as grouping by
>>>>>>>>>>>>> clusters and dynamically handling indices)?
>>>>>>>>>>>>>
>>>>>>>>>>>>> I’d be happy to give a shot at making the appropriate changes
>>>>>>>>>>>>> to the sink as well, although I’m far from an Elastic expert. If you point
>>>>>>>>>>>>> me in the right direction, I may be able to help out.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks much!
>>>>>>>>>>>>>
>>>>>>>>>>>>> Rion
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Aug 13, 2021, at 6:52 AM, David Morávek <dm...@apache.org>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Hi Rion,
>>>>>>>>>>>>>
>>>>>>>>>>>>> As you probably already know, for dynamic indices, you can
>>>>>>>>>>>>> simply implement your own ElasticsearchSinkFunction
>>>>>>>>>>>>> <https://github.com/apache/flink/blob/release-1.13.2/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkFunction.java>
>>>>>>>>>>>>> [1], where you can create any request that elastic client supports.
>>>>>>>>>>>>>
>>>>>>>>>>>>> The tricky part is how to implement dynamic routing into
>>>>>>>>>>>>> multiple clusters.
>>>>>>>>>>>>> - If the elastic clusters are known upfront (before submitting
>>>>>>>>>>>>> job), you can easily create multiple elastic sinks and prepend them with a
>>>>>>>>>>>>> simple filter (this is basically what split operator does).
>>>>>>>>>>>>> - If you discover elastics clusters at runtime, this would
>>>>>>>>>>>>> require some changes of the current ElasticsearchSink implementation. I
>>>>>>>>>>>>> think this may be actually as simple as introducing something like
>>>>>>>>>>>>> DynamicElasticsearchSink, that could dynamically create and managed "child"
>>>>>>>>>>>>> sinks. This would probably require some thoughts about how to manage
>>>>>>>>>>>>> consumed resources (memory), because number of child sink could be
>>>>>>>>>>>>> potentially unbounded. This could be of course simplified if underlying
>>>>>>>>>>>>> elastic client already supports that, which I'm not aware of. If you'd like
>>>>>>>>>>>>> to take this path, it would definitely be a great contribution to Flink
>>>>>>>>>>>>> (I'm able to provide some guidance).
>>>>>>>>>>>>>
>>>>>>>>>>>>> [1]
>>>>>>>>>>>>> https://github.com/apache/flink/blob/release-1.13.2/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkFunction.java
>>>>>>>>>>>>>
>>>>>>>>>>>>> Best,
>>>>>>>>>>>>> D.
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Sun, Aug 8, 2021 at 4:24 PM Rion Williams <
>>>>>>>>>>>>> rionmonster@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi folks,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I have a use-case that I wanted to initially pose to the
>>>>>>>>>>>>>> mailing list as I’m not terribly familiar with the Elasticsearch connector
>>>>>>>>>>>>>> to ensure I’m not going down the wrong path trying to accomplish this in
>>>>>>>>>>>>>> Flink (or if something downstream might be a better option).
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Basically, I have the following pieces to the puzzle:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>    - A stream of tenant-specific events
>>>>>>>>>>>>>>    - An HTTP endpoint containing mappings for
>>>>>>>>>>>>>>    tenant-specific Elastic cluster information (as each tenant has its own
>>>>>>>>>>>>>>    specific Elastic cluster/index)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> What I’m hoping to accomplish is the following:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>    1. One stream will periodically poll the HTTP endpoint
>>>>>>>>>>>>>>    and store these cluster mappings in state (keyed by tenant with cluster
>>>>>>>>>>>>>>    info as the value)
>>>>>>>>>>>>>>    2. The event stream will be keyed by tenant and connected
>>>>>>>>>>>>>>    to the cluster mappings stream.
>>>>>>>>>>>>>>    3. I’ll need to an Elasticsearch sink that can route the
>>>>>>>>>>>>>>    tenant-specific event data to its corresponding cluster/index from the
>>>>>>>>>>>>>>    mapping source.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I know that the existing Elasticsearch sink supports dynamic
>>>>>>>>>>>>>> indices, however I didn’t know if it’s possible to adjust the cluster like
>>>>>>>>>>>>>> I would need on a per-tenant basis or if there’s a better approach here?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Any advice would be appreciated.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Rion
>>>>>>>>>>>>>>
>>>>>>>>>>>>>

Re: Dynamic Cluster/Index Routing for Elasticsearch Sink

Posted by David Morávek <dm...@apache.org>.
Hi Rion,

thanks for opening the PR. I'll take a look at it this week. I'd also pull
Arvid into this topic to see whether he has any comments.

Best,
D.

On Sat, Sep 4, 2021 at 9:10 PM Rion Williams <ri...@gmail.com> wrote:

> Hi again David et al,
>
> I managed to push an initial pull request for the implementations for the
> DynamicElasticsearchSink and related ElasticsearchSinkRouter last week
> <https://github.com/apache/flink/pull/17061> and made some minor updates
> today with regards to the Javadocs (included code examples, etc.) along
> with a few tests that came to mind. I was hoping to get a few more eyes on
> it and figure out what else might be worth adding/changing/documenting in
> hopes of wrapping this feature up.
>
> Thanks again to everyone in this incredible community for their assistance
> with this, a local implementation of it for a project of mine is working
> like a charm, so I'm hoping it's something that others will be able to
> leverage for their own needs.
>
> Rion
>
> On Thu, Aug 26, 2021 at 11:45 AM David Morávek <dm...@apache.org> wrote:
>
>> Hi Rion,
>>
>> personally I'd start with unit test in the base module using a test sink
>> implementation. There is already *DummyElasticsearchSink* that you may
>> be able to reuse (just note that we're trying to get rid of Mockito based
>> tests such as this one).
>>
>> I'm bit unsure that integration test would actually test anything extra
>> that the unit test doesn't in this case, so I'd recommend it as the next
>> step (I'm also bit concerned that this test would take a long time to
>> execute / be resource intensive as it would need to spawn more elastic
>> clusters?).
>>
>> Best,
>> D.
>>
>> On Thu, Aug 26, 2021 at 5:47 PM Rion Williams <ri...@gmail.com>
>> wrote:
>>
>>> Just chiming in on this again.
>>>
>>> I think I have the pieces in place regarding the implementation (both a
>>> DynamicElasticsearchSink class and ElasticsearchSinkRouter interface) added
>>> to the elasticsearch-base module. I noticed that HttpHost wasn't available
>>> within that module/in the tests, so I'd suspect that I'd need to add a
>>> dependency similar to those found within the specific ES implementations
>>> (5/6/7). I'd also assume that it may be best to just provide a dummy sink
>>> similar to the other patterning to handle writing the unit tests or would
>>> you recommend separate Elasticsearch integration tests using a
>>> TestContainer of each supported version (5/6/7) similar to those within the
>>> ElasticsearchSinkITCase files under each module?
>>>
>>> Any advice / recommendations on this front would be helpful. I want to
>>> write some tests surrounding this that demonstrate the most common
>>> use-cases, but also don't want to go overkill.
>>>
>>> Thanks again for all of your help,
>>>
>>> Rion
>>>
>>> On Wed, Aug 25, 2021 at 2:10 PM Rion Williams <ri...@gmail.com>
>>> wrote:
>>>
>>>> Thanks again David,
>>>>
>>>> I've spun up a JIRA issue for the ticket
>>>> <https://issues.apache.org/jira/browse/FLINK-23977> while I work on
>>>> getting things into the proper state. If someone with the
>>>> appropriate privileges could assign it to me, I'd be appreciative. I'll
>>>> likely need some assistance at a few points to ensure things look as
>>>> expected, but I'm happy to help with this contribution.
>>>>
>>>> Rion
>>>>
>>>> On Wed, Aug 25, 2021 at 11:37 AM David Morávek <dm...@apache.org> wrote:
>>>>
>>>>> AFAIK there are currently no other sources in Flink that can treat
>>>>> "other sources" / "destination" as data. Most complete generic work on this
>>>>> topic that I'm aware of are Splittable DoFn based IOs in Apache Beam.
>>>>>
>>>>> I think the best module for the contribution would be
>>>>> "elasticsearch-base", because this could be easily reused for all ES
>>>>> versions that we currently support.
>>>>>
>>>>> Best,
>>>>> D.
>>>>>
>>>>> On Wed, Aug 25, 2021 at 4:58 PM Rion Williams <ri...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi David,
>>>>>>
>>>>>> That was perfect and it looks like this is working as I'd expected. I
>>>>>> put together some larger integration tests for my specific use-case
>>>>>> (multiple Elasticsearch clusters running in TestContainers) and verified
>>>>>> that messages were being routed dynamically to the appropriate sinks. I
>>>>>> forked the Flink repo last night and was trying to figure out the best
>>>>>> place to start adding these classes in (I noticed that there were three
>>>>>> separate ES packages targeting 5/6/7 respectively). I was going to try to
>>>>>> start fleshing the initial implementation for this, but wanted to make sure
>>>>>> that I was starting in the right place.
>>>>>>
>>>>>> Additionally, do you know of anything that might be similar to this
>>>>>> even within other sinks? Just trying to think of something to model this
>>>>>> after. Once I get things started, I'll spin up a JIRA issue for it and go
>>>>>> from there.
>>>>>>
>>>>>> Thanks so much for your help!
>>>>>>
>>>>>> Rion
>>>>>>
>>>>>> On Tue, Aug 24, 2021 at 1:45 AM David Morávek <dm...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Rion,
>>>>>>>
>>>>>>> you just need to call *sink.setRuntimeContext(getRuntimeContext())*
>>>>>>> before opening the child sink. Please see *AbstractRichFunction*
>>>>>>> [1] (that EleasticsearchSink extends) for more details.
>>>>>>>
>>>>>>> One more note, instead of starting with integration test, I'd
>>>>>>> recommend writing a unit test using *operator test harness* [2]
>>>>>>> first. This should help you to discover / debug many issues upfront. You
>>>>>>> can use *ElasticsearchSinkBaseTest* [3] as an example.
>>>>>>>
>>>>>>> [1]
>>>>>>> https://github.com/apache/flink/blob/release-1.13.2/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractRichFunction.java#L52
>>>>>>> [2]
>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/testing/#unit-testing-stateful-or-timely-udfs--custom-operators
>>>>>>> [3]
>>>>>>> https://github.com/apache/flink/blob/release-1.13.2/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java
>>>>>>>
>>>>>>> Best,
>>>>>>> D.
>>>>>>>
>>>>>>> On Tue, Aug 24, 2021 at 12:03 AM Rion Williams <
>>>>>>> rionmonster@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi David,
>>>>>>>>
>>>>>>>> Thanks again for the response, I believe that I'm getting pretty
>>>>>>>> close for at least a POC-level implementation of this. Currently, I'm
>>>>>>>> working with JsonObject instances throughout the pipeline, so I wanted to
>>>>>>>> try this out and simply stored the routing information within the element
>>>>>>>> itself for simplicity's sake right now, so it has a shape that looks
>>>>>>>> something like this:
>>>>>>>>
>>>>>>>> {
>>>>>>>>     "route": {
>>>>>>>>         "hosts": "...",
>>>>>>>>         "index": "...",
>>>>>>>>         ...
>>>>>>>>     },
>>>>>>>>     "all-other-fields-here"
>>>>>>>> }
>>>>>>>>
>>>>>>>> And I've stripped back several of the layers of the routers (since
>>>>>>>> I already have all of the information in the element at that point). I
>>>>>>>> tried using something like this:
>>>>>>>>
>>>>>>>> class DynamicElasticsearchSink: RichSinkFunction<JsonObject>(), CheckpointedFunction {
>>>>>>>>     private val sinkRoutes: MutableMap<String, ElasticsearchSink<JsonObject>> = ConcurrentHashMap()
>>>>>>>>     private lateinit var configuration: Configuration
>>>>>>>>
>>>>>>>>     override fun open(parameters: Configuration) {
>>>>>>>>         configuration = parameters
>>>>>>>>     }
>>>>>>>>
>>>>>>>>     override fun invoke(element: JsonObject, context: SinkFunction.Context) {
>>>>>>>>         val route = getHost(element)
>>>>>>>>         // Check if we already have a router for this cluster
>>>>>>>>         var sink = sinkRoutes[route]
>>>>>>>>         if (sink == null) {
>>>>>>>>             // If not, create one
>>>>>>>>             sink = buildSinkFromRoute(element)
>>>>>>>>             sink.open(configuration)
>>>>>>>>             sinkRoutes[route] = sink
>>>>>>>>         }
>>>>>>>>
>>>>>>>>         sink.invoke(element, context)
>>>>>>>>     }
>>>>>>>>
>>>>>>>>     override fun initializeState(context: FunctionInitializationContext) {
>>>>>>>>         // No-op.
>>>>>>>>     }
>>>>>>>>
>>>>>>>>     override fun snapshotState(context: FunctionSnapshotContext) {
>>>>>>>>         // This is used only to flush pending writes.
>>>>>>>>         for (sink in sinkRoutes.values) {
>>>>>>>>             sink.snapshotState(context)
>>>>>>>>         }
>>>>>>>>     }
>>>>>>>>
>>>>>>>>     override fun close() {
>>>>>>>>         for (sink in sinkRoutes.values) {
>>>>>>>>             sink.close()
>>>>>>>>         }
>>>>>>>>     }
>>>>>>>>
>>>>>>>>     private fun buildSinkFromRoute(element: JsonObject, ho): ElasticsearchSink<JsonObject> {
>>>>>>>>         val builder = ElasticsearchSink.Builder<JsonObject>(
>>>>>>>>             buildHostsFromElement(element),
>>>>>>>>             ElasticsearchRoutingFunction()
>>>>>>>>         )
>>>>>>>>
>>>>>>>>         builder.setBulkFlushMaxActions(1)
>>>>>>>>
>>>>>>>>         // TODO: Configure authorization if available
>>>>>>>> //        builder.setRestClientFactory { restClient ->
>>>>>>>> //            restClient.setHttpClientConfigCallback(object : RestClientBuilder.HttpClientConfigCallback {
>>>>>>>> //                override fun customizeHttpClient(builder: HttpAsyncClientBuilder): HttpAsyncClientBuilder {
>>>>>>>> //                    // Configure authorization here
>>>>>>>> //                    val credentialsProvider = BasicCredentialsProvider().apply {
>>>>>>>> //                        setCredentials(
>>>>>>>> //                            AuthScope.ANY,
>>>>>>>> //                            UsernamePasswordCredentials("$USERNAME", "$PASSWORD")
>>>>>>>> //                        )
>>>>>>>> //                    }
>>>>>>>> //
>>>>>>>> //                    return builder.setDefaultCredentialsProvider(credentialsProvider);
>>>>>>>> //                }
>>>>>>>> //            })
>>>>>>>> //        }
>>>>>>>>
>>>>>>>>         return builder.build()
>>>>>>>>     }
>>>>>>>>
>>>>>>>>     private fun buildHostsFromElement(element: JsonObject): List<HttpHost>{
>>>>>>>>         val transportAddresses = element
>>>>>>>>             .get("route").asJsonObject
>>>>>>>>             .get("hosts").asString
>>>>>>>>
>>>>>>>>         // If there are multiple, they should be comma-delimited
>>>>>>>>         val addresses = transportAddresses.split(",")
>>>>>>>>         return addresses
>>>>>>>>             .filter { address -> address.isNotEmpty() }
>>>>>>>>             .map { address ->
>>>>>>>>                 HttpHost.create(address)
>>>>>>>>             }
>>>>>>>>     }
>>>>>>>>
>>>>>>>>     private fun getHost(element: JsonObject): String {
>>>>>>>>         return element
>>>>>>>>             .get("route").asJsonObject
>>>>>>>>             .get("hosts").asString
>>>>>>>>     }
>>>>>>>>
>>>>>>>>     private class ElasticsearchRoutingFunction: ElasticsearchSinkFunction<JsonObject> {
>>>>>>>>         override fun process(element: JsonObject, context: RuntimeContext, indexer: RequestIndexer) {
>>>>>>>>             indexer.add(request(element))
>>>>>>>>         }
>>>>>>>>
>>>>>>>>         private fun request(element: JsonObject): IndexRequest {
>>>>>>>>             // Access routing information
>>>>>>>>             val index = element
>>>>>>>>                 .get("route").asJsonObject
>>>>>>>>                 .get("index").asString
>>>>>>>>
>>>>>>>>             // Strip off routing information
>>>>>>>>             element.remove("route")
>>>>>>>>
>>>>>>>>             // Send the request
>>>>>>>>             return Requests.indexRequest()
>>>>>>>>                 .index(index)
>>>>>>>>                 .type("_doc")
>>>>>>>>                 .source(mapOf(
>>>>>>>>                     "data" to "$element"
>>>>>>>>                 ))
>>>>>>>>         }
>>>>>>>>     }
>>>>>>>> }
>>>>>>>>
>>>>>>>> After running an integration test, I keep encountering running into
>>>>>>>> the following error during the invocation of the child sink:
>>>>>>>>
>>>>>>>> // The runtime context has not been initialized.
>>>>>>>> sink.invoke(element, context)
>>>>>>>>
>>>>>>>> I can see the underlying sink getting initialized, the open call
>>>>>>>> being made, etc. however for some reason it looks like there's an issue
>>>>>>>> related to the context during the invoke call namely* "The runtime
>>>>>>>> context has not been initialized". *I had assumed this would be
>>>>>>>> alright since the context for the "wrapper" should have already been
>>>>>>>> initialized, but maybe there's something that I'm missing.
>>>>>>>>
>>>>>>>> Also, please forgive any hastily written or nasty code as this is
>>>>>>>> purely a POC to see if I could get this to work using a single object. I
>>>>>>>> have the hopes of cleaning it up and genericizing it after I am confident
>>>>>>>> that it actually works.
>>>>>>>>
>>>>>>>> Thanks so much again,
>>>>>>>>
>>>>>>>> Rion
>>>>>>>>
>>>>>>>> On Mon, Aug 23, 2021 at 11:12 AM David Morávek <dm...@apache.org>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi Rion,
>>>>>>>>>
>>>>>>>>> Sorry for late reply, I've missed your previous message. Thanks
>>>>>>>>> Arvid for the reminder <3.
>>>>>>>>>
>>>>>>>>> something like a MessageWrapper<ElementT, ConfigurationT> and
>>>>>>>>>> pass those elements to the sink, which would create the tenant-specific
>>>>>>>>>> Elastic connection from the ConfigurationT element and handle
>>>>>>>>>> caching it and then just grab the element and send it on it's way?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Yes, this is exactly what I had in mind. There should be almost no
>>>>>>>>> overhead as sink can be easily chained with your join
>>>>>>>>> (KeyedCoProcessFunction) function.
>>>>>>>>>
>>>>>>>>>    -
>>>>>>>>>    -
>>>>>>>>>>
>>>>>>>>>>    The shape of the elements being evicted from the process
>>>>>>>>>>    function (Is a simple wrapper with the configuration for the sink enough
>>>>>>>>>>    here? Do I need to explicitly initialize the sink within this function?
>>>>>>>>>>    Etc.)
>>>>>>>>>
>>>>>>>>>    -
>>>>>>>>>    - To write an element you need a configuration for the
>>>>>>>>>    destination and the element itself, so a tuple of *(ElasticConfiguration,
>>>>>>>>>    Element)* should be enough (that's basically your MessageWrapper<ElementT,
>>>>>>>>>    ConfigurationT> class).
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>    -
>>>>>>>>>    -
>>>>>>>>>>
>>>>>>>>>>    The actual use of the *DynamicElasticsearchSink* class (Would
>>>>>>>>>>    it just be something like an *.addSink(*
>>>>>>>>>>    *DynamicElasticSearchSink<**String, Configuration>())* or
>>>>>>>>>>    perhaps something else entirely?)
>>>>>>>>>
>>>>>>>>>    -
>>>>>>>>>
>>>>>>>>> I guess it could look something like the snippet below. It would
>>>>>>>>> be definitely good to play around with the
>>>>>>>>> *DynamicElasticSearchSink* API and make it more meaningful / user
>>>>>>>>> friendly (the gist I've shared was just a very rough prototype to showcase
>>>>>>>>> the idea).
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>    - static class Destination {
>>>>>>>>>
>>>>>>>>>        private final List<HttpHost> httpHosts;
>>>>>>>>>
>>>>>>>>>        Destination(List<HttpHost> httpHosts) {
>>>>>>>>>            this.httpHosts = httpHosts;
>>>>>>>>>        }
>>>>>>>>>    }
>>>>>>>>>    -
>>>>>>>>>    - final DataStream<Tuple2<Destination, String>> toWrite = ...;
>>>>>>>>>    toWrite.addSink(
>>>>>>>>>            new DynamicElasticsearchSink<>(
>>>>>>>>>                    new SinkRouter<
>>>>>>>>>                            Tuple2<Destination, String>,
>>>>>>>>>                            String,
>>>>>>>>>                            ElasticsearchSink<Tuple2<Destination,
>>>>>>>>>    String>>>() {
>>>>>>>>>
>>>>>>>>>                        @Override
>>>>>>>>>                        public String getRoute(Tuple2<Destination,
>>>>>>>>>    String> element) {
>>>>>>>>>    -                         // Construct a deterministic unique
>>>>>>>>>    caching key for the destination... (this could be cheaper if you know the
>>>>>>>>>    data)
>>>>>>>>>                            return element.f0.httpHosts.stream()
>>>>>>>>>                                    .map(HttpHost::toHostString)
>>>>>>>>>
>>>>>>>>>    .collect(Collectors.joining(","));
>>>>>>>>>                        }
>>>>>>>>>
>>>>>>>>>                        @Override
>>>>>>>>>                        public
>>>>>>>>>    ElasticsearchSink<Tuple2<Destination, String>> createSink(
>>>>>>>>>                                String cacheKey,
>>>>>>>>>    Tuple2<Destination, String> element) {
>>>>>>>>>                            return new ElasticsearchSink.Builder<>(
>>>>>>>>>                                            element.f0.httpHosts,
>>>>>>>>>
>>>>>>>>>    (ElasticsearchSinkFunction<
>>>>>>>>>
>>>>>>>>>    Tuple2<Destination, String>>)
>>>>>>>>>                                                    (el, ctx,
>>>>>>>>>    indexer) -> {
>>>>>>>>>                                                        //
>>>>>>>>>    Construct index request.
>>>>>>>>>                                                        final
>>>>>>>>>    IndexRequest request = ...;
>>>>>>>>>
>>>>>>>>>    indexer.add(request);
>>>>>>>>>                                                    })
>>>>>>>>>                                    .build();
>>>>>>>>>                        }
>>>>>>>>>                    }));
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> I hope this helps ;)
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> D.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Mon, Aug 16, 2021 at 5:18 PM Rion Williams <
>>>>>>>>> rionmonster@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Thanks for this suggestion David, it's extremely helpful.
>>>>>>>>>>
>>>>>>>>>> Since this will vary depending on the elements retrieved from a
>>>>>>>>>> separate stream, I'm guessing something like the following would be
>>>>>>>>>> roughly the avenue to continue down:
>>>>>>>>>>
>>>>>>>>>> fun main(args: Array<String>) {
>>>>>>>>>>     val parameters = mergeParametersFromProperties(args)
>>>>>>>>>>     val stream = StreamExecutionEnvironment.getExecutionEnvironment()
>>>>>>>>>>
>>>>>>>>>>     // Get the stream for tenant-specific Elastic configurations
>>>>>>>>>>     val connectionStream = stream
>>>>>>>>>>         .fromSource(
>>>>>>>>>>             KafkaSource.of(parameters, listOf("elastic-configs")),
>>>>>>>>>>             WatermarkStrategy.noWatermarks(),
>>>>>>>>>>             "elastic-configs"
>>>>>>>>>>         )
>>>>>>>>>>
>>>>>>>>>>     // Get the stream of incoming messages to be routed to Elastic
>>>>>>>>>>     stream
>>>>>>>>>>         .fromSource(
>>>>>>>>>>             KafkaSource.of(parameters, listOf("messages")),
>>>>>>>>>>             WatermarkStrategy.noWatermarks(),
>>>>>>>>>>             "messages"
>>>>>>>>>>         )
>>>>>>>>>>         .keyBy { message ->
>>>>>>>>>>             // Key by the tenant in the message
>>>>>>>>>>             message.getTenant()
>>>>>>>>>>         }
>>>>>>>>>>         .connect(
>>>>>>>>>>             // Connect the messages stream with the configurations
>>>>>>>>>>             connectionStream
>>>>>>>>>>         )
>>>>>>>>>>         .process(object : KeyedCoProcessFunction<String, String, String, String>() {
>>>>>>>>>>             // For this key, we need to store all of the previous messages in state
>>>>>>>>>>             // in the case where we don't have a given mapping for this tenant yet
>>>>>>>>>>             lateinit var messagesAwaitingConfigState: ListState<String>
>>>>>>>>>>             lateinit var configState: ValueState<String>
>>>>>>>>>>
>>>>>>>>>>             override fun open(parameters: Configuration) {
>>>>>>>>>>                 super.open(parameters)
>>>>>>>>>>                 // Initialize the states
>>>>>>>>>>                 messagesAwaitingConfigState = runtimeContext.getListState(awaitingStateDesc)
>>>>>>>>>>                 configState = runtimeContext.getState(configStateDesc)
>>>>>>>>>>             }
>>>>>>>>>>
>>>>>>>>>>             // When an element is received
>>>>>>>>>>             override fun processElement1(message: String, context: Context, out: Collector<String>) {
>>>>>>>>>>                 // Check if we have a mapping
>>>>>>>>>>                 if (configState.value() == null){
>>>>>>>>>>                     // We don't have a mapping for this tenant, store messages until we get it
>>>>>>>>>>                     messagesAwaitingConfigState.add(message)
>>>>>>>>>>                 }
>>>>>>>>>>                 else {
>>>>>>>>>>                     // Output the record with some indicator of the route?
>>>>>>>>>>                     out.collect(message)
>>>>>>>>>>                 }
>>>>>>>>>>             }
>>>>>>>>>>
>>>>>>>>>>             override fun processElement2(config: String, context: Context, out: Collector<String>) {
>>>>>>>>>>                 // If this mapping is for this specific tenant, store it and flush the pending
>>>>>>>>>>                 // records in state
>>>>>>>>>>                 if (config.getTenant() == context.currentKey){
>>>>>>>>>>                     configState.update(config)
>>>>>>>>>>                     val messagesToFlush = messagesAwaitingConfigState.get()
>>>>>>>>>>                     messagesToFlush.forEach { message ->
>>>>>>>>>>                         out.collect(message)
>>>>>>>>>>                     }
>>>>>>>>>>                 }
>>>>>>>>>>             }
>>>>>>>>>>
>>>>>>>>>>             // State descriptors
>>>>>>>>>>             val awaitingStateDesc = ListStateDescriptor(
>>>>>>>>>>                 "messages-awaiting-config",
>>>>>>>>>>                 TypeInformation.of(String::class.java)
>>>>>>>>>>             )
>>>>>>>>>>
>>>>>>>>>>             val configStateDesc = ValueStateDescriptor(
>>>>>>>>>>                 "elastic-config",
>>>>>>>>>>                 TypeInformation.of(String::class.java)
>>>>>>>>>>             )
>>>>>>>>>>         })
>>>>>>>>>>
>>>>>>>>>>     stream.executeAsync("$APPLICATION_NAME-job")
>>>>>>>>>> }
>>>>>>>>>>
>>>>>>>>>> Basically, connect my tenant-specific configuration stream with
>>>>>>>>>> my incoming messages (keyed by tenant) and buffer them until I have a
>>>>>>>>>> corresponding configuration (to avoid race-conditions). However, I'm
>>>>>>>>>> guessing what will happen here is rather than directly outputting the
>>>>>>>>>> messages from this process function, I'd construct some type of wrapper
>>>>>>>>>> here with the necessary routing/configuration for the message (obtained via
>>>>>>>>>> the configuration stream) along with the element, which might be something
>>>>>>>>>> like a MessageWrapper<ElementT, ConfigurationT> and pass those
>>>>>>>>>> elements to the sink, which would create the tenant-specific Elastic
>>>>>>>>>> connection from the ConfigurationT element and handle caching it
>>>>>>>>>> and then just grab the element and send it on it's way?
>>>>>>>>>>
>>>>>>>>>> Those are really the only bits I'm stuck on at the moment:
>>>>>>>>>>
>>>>>>>>>>    1. The shape of the elements being evicted from the process
>>>>>>>>>>    function (Is a simple wrapper with the configuration for the sink enough
>>>>>>>>>>    here? Do I need to explicitly initialize the sink within this function?
>>>>>>>>>>    Etc.)
>>>>>>>>>>    2. The actual use of the DynamicElasticsearchSink class
>>>>>>>>>>    (Would it just be something like an .addSink(DynamicElasticSearchSink<String,
>>>>>>>>>>    Configuration>()) or perhaps something else entirely?)
>>>>>>>>>>
>>>>>>>>>> Thanks again so much for the advice thus far David, it's greatly
>>>>>>>>>> appreciated.
>>>>>>>>>>
>>>>>>>>>> Rion
>>>>>>>>>>
>>>>>>>>>> On Fri, Aug 13, 2021 at 9:04 AM David Morávek <dm...@apache.org>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> To give you a better idea, in high-level I think could look
>>>>>>>>>>> something like this
>>>>>>>>>>> <https://gist.github.com/dmvk/3f8124d585cd33e52cacd4a38b80f8c8>
>>>>>>>>>>> [1].
>>>>>>>>>>>
>>>>>>>>>>> [1]
>>>>>>>>>>> https://gist.github.com/dmvk/3f8124d585cd33e52cacd4a38b80f8c8
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Aug 13, 2021 at 2:57 PM Rion Williams <
>>>>>>>>>>> rionmonster@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi David,
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks for your response! I think there are currently quite a
>>>>>>>>>>>> few unknowns in my end in terms of what a production loads look like but I
>>>>>>>>>>>> think the number of clusters shouldn’t be too large (and will either rarely
>>>>>>>>>>>> change or have new entries come in at runtime, but it needs to support
>>>>>>>>>>>> that).
>>>>>>>>>>>>
>>>>>>>>>>>> I think the dynamic approach might be a good route to explore
>>>>>>>>>>>> with actual changes to the Elasticsearch sink as a longer term option. I’m
>>>>>>>>>>>> not sure what the dynamic one would look like at the moment though, perhaps
>>>>>>>>>>>> that’s something you’d be able to advise on?
>>>>>>>>>>>>
>>>>>>>>>>>> Given that all the records are keyed for a given tenant and I
>>>>>>>>>>>> would have the mappings stored in state, is it possible that within the
>>>>>>>>>>>> open() function for this dynamic route to access the state and initialize
>>>>>>>>>>>> the client there? Or maybe there’s some other approach (such as grouping by
>>>>>>>>>>>> clusters and dynamically handling indices)?
>>>>>>>>>>>>
>>>>>>>>>>>> I’d be happy to give a shot at making the appropriate changes
>>>>>>>>>>>> to the sink as well, although I’m far from an Elastic expert. If you point
>>>>>>>>>>>> me in the right direction, I may be able to help out.
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks much!
>>>>>>>>>>>>
>>>>>>>>>>>> Rion
>>>>>>>>>>>>
>>>>>>>>>>>> On Aug 13, 2021, at 6:52 AM, David Morávek <dm...@apache.org>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>> 
>>>>>>>>>>>> Hi Rion,
>>>>>>>>>>>>
>>>>>>>>>>>> As you probably already know, for dynamic indices, you can
>>>>>>>>>>>> simply implement your own ElasticsearchSinkFunction
>>>>>>>>>>>> <https://github.com/apache/flink/blob/release-1.13.2/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkFunction.java>
>>>>>>>>>>>> [1], where you can create any request that elastic client supports.
>>>>>>>>>>>>
>>>>>>>>>>>> The tricky part is how to implement dynamic routing into
>>>>>>>>>>>> multiple clusters.
>>>>>>>>>>>> - If the elastic clusters are known upfront (before submitting
>>>>>>>>>>>> job), you can easily create multiple elastic sinks and prepend them with a
>>>>>>>>>>>> simple filter (this is basically what split operator does).
>>>>>>>>>>>> - If you discover elastics clusters at runtime, this would
>>>>>>>>>>>> require some changes of the current ElasticsearchSink implementation. I
>>>>>>>>>>>> think this may be actually as simple as introducing something like
>>>>>>>>>>>> DynamicElasticsearchSink, that could dynamically create and managed "child"
>>>>>>>>>>>> sinks. This would probably require some thoughts about how to manage
>>>>>>>>>>>> consumed resources (memory), because number of child sink could be
>>>>>>>>>>>> potentially unbounded. This could be of course simplified if underlying
>>>>>>>>>>>> elastic client already supports that, which I'm not aware of. If you'd like
>>>>>>>>>>>> to take this path, it would definitely be a great contribution to Flink
>>>>>>>>>>>> (I'm able to provide some guidance).
>>>>>>>>>>>>
>>>>>>>>>>>> [1]
>>>>>>>>>>>> https://github.com/apache/flink/blob/release-1.13.2/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkFunction.java
>>>>>>>>>>>>
>>>>>>>>>>>> Best,
>>>>>>>>>>>> D.
>>>>>>>>>>>>
>>>>>>>>>>>> On Sun, Aug 8, 2021 at 4:24 PM Rion Williams <
>>>>>>>>>>>> rionmonster@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi folks,
>>>>>>>>>>>>>
>>>>>>>>>>>>> I have a use-case that I wanted to initially pose to the
>>>>>>>>>>>>> mailing list as I’m not terribly familiar with the Elasticsearch connector
>>>>>>>>>>>>> to ensure I’m not going down the wrong path trying to accomplish this in
>>>>>>>>>>>>> Flink (or if something downstream might be a better option).
>>>>>>>>>>>>>
>>>>>>>>>>>>> Basically, I have the following pieces to the puzzle:
>>>>>>>>>>>>>
>>>>>>>>>>>>>    - A stream of tenant-specific events
>>>>>>>>>>>>>    - An HTTP endpoint containing mappings for tenant-specific
>>>>>>>>>>>>>    Elastic cluster information (as each tenant has its own specific Elastic
>>>>>>>>>>>>>    cluster/index)
>>>>>>>>>>>>>
>>>>>>>>>>>>> What I’m hoping to accomplish is the following:
>>>>>>>>>>>>>
>>>>>>>>>>>>>    1. One stream will periodically poll the HTTP endpoint and
>>>>>>>>>>>>>    store these cluster mappings in state (keyed by tenant with cluster info as
>>>>>>>>>>>>>    the value)
>>>>>>>>>>>>>    2. The event stream will be keyed by tenant and connected
>>>>>>>>>>>>>    to the cluster mappings stream.
>>>>>>>>>>>>>    3. I’ll need to an Elasticsearch sink that can route the
>>>>>>>>>>>>>    tenant-specific event data to its corresponding cluster/index from the
>>>>>>>>>>>>>    mapping source.
>>>>>>>>>>>>>
>>>>>>>>>>>>> I know that the existing Elasticsearch sink supports dynamic
>>>>>>>>>>>>> indices, however I didn’t know if it’s possible to adjust the cluster like
>>>>>>>>>>>>> I would need on a per-tenant basis or if there’s a better approach here?
>>>>>>>>>>>>>
>>>>>>>>>>>>> Any advice would be appreciated.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>
>>>>>>>>>>>>> Rion
>>>>>>>>>>>>>
>>>>>>>>>>>>

Re: Dynamic Cluster/Index Routing for Elasticsearch Sink

Posted by Rion Williams <ri...@gmail.com>.
Hi again David et al,

I managed to push an initial pull request for the implementations for the
DynamicElasticsearchSink and related ElasticsearchSinkRouter last week
<https://github.com/apache/flink/pull/17061> and made some minor updates
today with regards to the Javadocs (included code examples, etc.) along
with a few tests that came to mind. I was hoping to get a few more eyes on
it and figure out what else might be worth adding/changing/documenting in
hopes of wrapping this feature up.

Thanks again to everyone in this incredible community for their assistance
with this, a local implementation of it for a project of mine is working
like a charm, so I'm hoping it's something that others will be able to
leverage for their own needs.

Rion

On Thu, Aug 26, 2021 at 11:45 AM David Morávek <dm...@apache.org> wrote:

> Hi Rion,
>
> personally I'd start with unit test in the base module using a test sink
> implementation. There is already *DummyElasticsearchSink* that you may be
> able to reuse (just note that we're trying to get rid of Mockito based
> tests such as this one).
>
> I'm bit unsure that integration test would actually test anything extra
> that the unit test doesn't in this case, so I'd recommend it as the next
> step (I'm also bit concerned that this test would take a long time to
> execute / be resource intensive as it would need to spawn more elastic
> clusters?).
>
> Best,
> D.
>
> On Thu, Aug 26, 2021 at 5:47 PM Rion Williams <ri...@gmail.com>
> wrote:
>
>> Just chiming in on this again.
>>
>> I think I have the pieces in place regarding the implementation (both a
>> DynamicElasticsearchSink class and ElasticsearchSinkRouter interface) added
>> to the elasticsearch-base module. I noticed that HttpHost wasn't available
>> within that module/in the tests, so I'd suspect that I'd need to add a
>> dependency similar to those found within the specific ES implementations
>> (5/6/7). I'd also assume that it may be best to just provide a dummy sink
>> similar to the other patterning to handle writing the unit tests or would
>> you recommend separate Elasticsearch integration tests using a
>> TestContainer of each supported version (5/6/7) similar to those within the
>> ElasticsearchSinkITCase files under each module?
>>
>> Any advice / recommendations on this front would be helpful. I want to
>> write some tests surrounding this that demonstrate the most common
>> use-cases, but also don't want to go overkill.
>>
>> Thanks again for all of your help,
>>
>> Rion
>>
>> On Wed, Aug 25, 2021 at 2:10 PM Rion Williams <ri...@gmail.com>
>> wrote:
>>
>>> Thanks again David,
>>>
>>> I've spun up a JIRA issue for the ticket
>>> <https://issues.apache.org/jira/browse/FLINK-23977> while I work on
>>> getting things into the proper state. If someone with the
>>> appropriate privileges could assign it to me, I'd be appreciative. I'll
>>> likely need some assistance at a few points to ensure things look as
>>> expected, but I'm happy to help with this contribution.
>>>
>>> Rion
>>>
>>> On Wed, Aug 25, 2021 at 11:37 AM David Morávek <dm...@apache.org> wrote:
>>>
>>>> AFAIK there are currently no other sources in Flink that can treat
>>>> "other sources" / "destination" as data. Most complete generic work on this
>>>> topic that I'm aware of are Splittable DoFn based IOs in Apache Beam.
>>>>
>>>> I think the best module for the contribution would be
>>>> "elasticsearch-base", because this could be easily reused for all ES
>>>> versions that we currently support.
>>>>
>>>> Best,
>>>> D.
>>>>
>>>> On Wed, Aug 25, 2021 at 4:58 PM Rion Williams <ri...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi David,
>>>>>
>>>>> That was perfect and it looks like this is working as I'd expected. I
>>>>> put together some larger integration tests for my specific use-case
>>>>> (multiple Elasticsearch clusters running in TestContainers) and verified
>>>>> that messages were being routed dynamically to the appropriate sinks. I
>>>>> forked the Flink repo last night and was trying to figure out the best
>>>>> place to start adding these classes in (I noticed that there were three
>>>>> separate ES packages targeting 5/6/7 respectively). I was going to try to
>>>>> start fleshing the initial implementation for this, but wanted to make sure
>>>>> that I was starting in the right place.
>>>>>
>>>>> Additionally, do you know of anything that might be similar to this
>>>>> even within other sinks? Just trying to think of something to model this
>>>>> after. Once I get things started, I'll spin up a JIRA issue for it and go
>>>>> from there.
>>>>>
>>>>> Thanks so much for your help!
>>>>>
>>>>> Rion
>>>>>
>>>>> On Tue, Aug 24, 2021 at 1:45 AM David Morávek <dm...@apache.org> wrote:
>>>>>
>>>>>> Hi Rion,
>>>>>>
>>>>>> you just need to call *sink.setRuntimeContext(getRuntimeContext())*
>>>>>> before opening the child sink. Please see *AbstractRichFunction* [1]
>>>>>> (that EleasticsearchSink extends) for more details.
>>>>>>
>>>>>> One more note, instead of starting with integration test, I'd
>>>>>> recommend writing a unit test using *operator test harness* [2]
>>>>>> first. This should help you to discover / debug many issues upfront. You
>>>>>> can use *ElasticsearchSinkBaseTest* [3] as an example.
>>>>>>
>>>>>> [1]
>>>>>> https://github.com/apache/flink/blob/release-1.13.2/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractRichFunction.java#L52
>>>>>> [2]
>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/testing/#unit-testing-stateful-or-timely-udfs--custom-operators
>>>>>> [3]
>>>>>> https://github.com/apache/flink/blob/release-1.13.2/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java
>>>>>>
>>>>>> Best,
>>>>>> D.
>>>>>>
>>>>>> On Tue, Aug 24, 2021 at 12:03 AM Rion Williams <ri...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi David,
>>>>>>>
>>>>>>> Thanks again for the response, I believe that I'm getting pretty
>>>>>>> close for at least a POC-level implementation of this. Currently, I'm
>>>>>>> working with JsonObject instances throughout the pipeline, so I wanted to
>>>>>>> try this out and simply stored the routing information within the element
>>>>>>> itself for simplicity's sake right now, so it has a shape that looks
>>>>>>> something like this:
>>>>>>>
>>>>>>> {
>>>>>>>     "route": {
>>>>>>>         "hosts": "...",
>>>>>>>         "index": "...",
>>>>>>>         ...
>>>>>>>     },
>>>>>>>     "all-other-fields-here"
>>>>>>> }
>>>>>>>
>>>>>>> And I've stripped back several of the layers of the routers (since I
>>>>>>> already have all of the information in the element at that point). I tried
>>>>>>> using something like this:
>>>>>>>
>>>>>>> class DynamicElasticsearchSink: RichSinkFunction<JsonObject>(), CheckpointedFunction {
>>>>>>>     private val sinkRoutes: MutableMap<String, ElasticsearchSink<JsonObject>> = ConcurrentHashMap()
>>>>>>>     private lateinit var configuration: Configuration
>>>>>>>
>>>>>>>     override fun open(parameters: Configuration) {
>>>>>>>         configuration = parameters
>>>>>>>     }
>>>>>>>
>>>>>>>     override fun invoke(element: JsonObject, context: SinkFunction.Context) {
>>>>>>>         val route = getHost(element)
>>>>>>>         // Check if we already have a router for this cluster
>>>>>>>         var sink = sinkRoutes[route]
>>>>>>>         if (sink == null) {
>>>>>>>             // If not, create one
>>>>>>>             sink = buildSinkFromRoute(element)
>>>>>>>             sink.open(configuration)
>>>>>>>             sinkRoutes[route] = sink
>>>>>>>         }
>>>>>>>
>>>>>>>         sink.invoke(element, context)
>>>>>>>     }
>>>>>>>
>>>>>>>     override fun initializeState(context: FunctionInitializationContext) {
>>>>>>>         // No-op.
>>>>>>>     }
>>>>>>>
>>>>>>>     override fun snapshotState(context: FunctionSnapshotContext) {
>>>>>>>         // This is used only to flush pending writes.
>>>>>>>         for (sink in sinkRoutes.values) {
>>>>>>>             sink.snapshotState(context)
>>>>>>>         }
>>>>>>>     }
>>>>>>>
>>>>>>>     override fun close() {
>>>>>>>         for (sink in sinkRoutes.values) {
>>>>>>>             sink.close()
>>>>>>>         }
>>>>>>>     }
>>>>>>>
>>>>>>>     private fun buildSinkFromRoute(element: JsonObject, ho): ElasticsearchSink<JsonObject> {
>>>>>>>         val builder = ElasticsearchSink.Builder<JsonObject>(
>>>>>>>             buildHostsFromElement(element),
>>>>>>>             ElasticsearchRoutingFunction()
>>>>>>>         )
>>>>>>>
>>>>>>>         builder.setBulkFlushMaxActions(1)
>>>>>>>
>>>>>>>         // TODO: Configure authorization if available
>>>>>>> //        builder.setRestClientFactory { restClient ->
>>>>>>> //            restClient.setHttpClientConfigCallback(object : RestClientBuilder.HttpClientConfigCallback {
>>>>>>> //                override fun customizeHttpClient(builder: HttpAsyncClientBuilder): HttpAsyncClientBuilder {
>>>>>>> //                    // Configure authorization here
>>>>>>> //                    val credentialsProvider = BasicCredentialsProvider().apply {
>>>>>>> //                        setCredentials(
>>>>>>> //                            AuthScope.ANY,
>>>>>>> //                            UsernamePasswordCredentials("$USERNAME", "$PASSWORD")
>>>>>>> //                        )
>>>>>>> //                    }
>>>>>>> //
>>>>>>> //                    return builder.setDefaultCredentialsProvider(credentialsProvider);
>>>>>>> //                }
>>>>>>> //            })
>>>>>>> //        }
>>>>>>>
>>>>>>>         return builder.build()
>>>>>>>     }
>>>>>>>
>>>>>>>     private fun buildHostsFromElement(element: JsonObject): List<HttpHost>{
>>>>>>>         val transportAddresses = element
>>>>>>>             .get("route").asJsonObject
>>>>>>>             .get("hosts").asString
>>>>>>>
>>>>>>>         // If there are multiple, they should be comma-delimited
>>>>>>>         val addresses = transportAddresses.split(",")
>>>>>>>         return addresses
>>>>>>>             .filter { address -> address.isNotEmpty() }
>>>>>>>             .map { address ->
>>>>>>>                 HttpHost.create(address)
>>>>>>>             }
>>>>>>>     }
>>>>>>>
>>>>>>>     private fun getHost(element: JsonObject): String {
>>>>>>>         return element
>>>>>>>             .get("route").asJsonObject
>>>>>>>             .get("hosts").asString
>>>>>>>     }
>>>>>>>
>>>>>>>     private class ElasticsearchRoutingFunction: ElasticsearchSinkFunction<JsonObject> {
>>>>>>>         override fun process(element: JsonObject, context: RuntimeContext, indexer: RequestIndexer) {
>>>>>>>             indexer.add(request(element))
>>>>>>>         }
>>>>>>>
>>>>>>>         private fun request(element: JsonObject): IndexRequest {
>>>>>>>             // Access routing information
>>>>>>>             val index = element
>>>>>>>                 .get("route").asJsonObject
>>>>>>>                 .get("index").asString
>>>>>>>
>>>>>>>             // Strip off routing information
>>>>>>>             element.remove("route")
>>>>>>>
>>>>>>>             // Send the request
>>>>>>>             return Requests.indexRequest()
>>>>>>>                 .index(index)
>>>>>>>                 .type("_doc")
>>>>>>>                 .source(mapOf(
>>>>>>>                     "data" to "$element"
>>>>>>>                 ))
>>>>>>>         }
>>>>>>>     }
>>>>>>> }
>>>>>>>
>>>>>>> After running an integration test, I keep encountering running into
>>>>>>> the following error during the invocation of the child sink:
>>>>>>>
>>>>>>> // The runtime context has not been initialized.
>>>>>>> sink.invoke(element, context)
>>>>>>>
>>>>>>> I can see the underlying sink getting initialized, the open call
>>>>>>> being made, etc. however for some reason it looks like there's an issue
>>>>>>> related to the context during the invoke call namely* "The runtime
>>>>>>> context has not been initialized". *I had assumed this would be
>>>>>>> alright since the context for the "wrapper" should have already been
>>>>>>> initialized, but maybe there's something that I'm missing.
>>>>>>>
>>>>>>> Also, please forgive any hastily written or nasty code as this is
>>>>>>> purely a POC to see if I could get this to work using a single object. I
>>>>>>> have the hopes of cleaning it up and genericizing it after I am confident
>>>>>>> that it actually works.
>>>>>>>
>>>>>>> Thanks so much again,
>>>>>>>
>>>>>>> Rion
>>>>>>>
>>>>>>> On Mon, Aug 23, 2021 at 11:12 AM David Morávek <dm...@apache.org>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Rion,
>>>>>>>>
>>>>>>>> Sorry for late reply, I've missed your previous message. Thanks
>>>>>>>> Arvid for the reminder <3.
>>>>>>>>
>>>>>>>> something like a MessageWrapper<ElementT, ConfigurationT> and pass
>>>>>>>>> those elements to the sink, which would create the tenant-specific Elastic
>>>>>>>>> connection from the ConfigurationT element and handle caching it
>>>>>>>>> and then just grab the element and send it on it's way?
>>>>>>>>
>>>>>>>>
>>>>>>>> Yes, this is exactly what I had in mind. There should be almost no
>>>>>>>> overhead as sink can be easily chained with your join
>>>>>>>> (KeyedCoProcessFunction) function.
>>>>>>>>
>>>>>>>>    -
>>>>>>>>    -
>>>>>>>>>
>>>>>>>>>    The shape of the elements being evicted from the process
>>>>>>>>>    function (Is a simple wrapper with the configuration for the sink enough
>>>>>>>>>    here? Do I need to explicitly initialize the sink within this function?
>>>>>>>>>    Etc.)
>>>>>>>>
>>>>>>>>    -
>>>>>>>>    - To write an element you need a configuration for the
>>>>>>>>    destination and the element itself, so a tuple of *(ElasticConfiguration,
>>>>>>>>    Element)* should be enough (that's basically your MessageWrapper<ElementT,
>>>>>>>>    ConfigurationT> class).
>>>>>>>>
>>>>>>>>
>>>>>>>>    -
>>>>>>>>    -
>>>>>>>>>
>>>>>>>>>    The actual use of the *DynamicElasticsearchSink* class (Would
>>>>>>>>>    it just be something like an *.addSink(*
>>>>>>>>>    *DynamicElasticSearchSink<**String, Configuration>())* or
>>>>>>>>>    perhaps something else entirely?)
>>>>>>>>
>>>>>>>>    -
>>>>>>>>
>>>>>>>> I guess it could look something like the snippet below. It would be
>>>>>>>> definitely good to play around with the *DynamicElasticSearchSink*
>>>>>>>> API and make it more meaningful / user friendly (the gist I've shared was
>>>>>>>> just a very rough prototype to showcase the idea).
>>>>>>>>
>>>>>>>>
>>>>>>>>    - static class Destination {
>>>>>>>>
>>>>>>>>        private final List<HttpHost> httpHosts;
>>>>>>>>
>>>>>>>>        Destination(List<HttpHost> httpHosts) {
>>>>>>>>            this.httpHosts = httpHosts;
>>>>>>>>        }
>>>>>>>>    }
>>>>>>>>    -
>>>>>>>>    - final DataStream<Tuple2<Destination, String>> toWrite = ...;
>>>>>>>>    toWrite.addSink(
>>>>>>>>            new DynamicElasticsearchSink<>(
>>>>>>>>                    new SinkRouter<
>>>>>>>>                            Tuple2<Destination, String>,
>>>>>>>>                            String,
>>>>>>>>                            ElasticsearchSink<Tuple2<Destination,
>>>>>>>>    String>>>() {
>>>>>>>>
>>>>>>>>                        @Override
>>>>>>>>                        public String getRoute(Tuple2<Destination,
>>>>>>>>    String> element) {
>>>>>>>>    -                         // Construct a deterministic unique
>>>>>>>>    caching key for the destination... (this could be cheaper if you know the
>>>>>>>>    data)
>>>>>>>>                            return element.f0.httpHosts.stream()
>>>>>>>>                                    .map(HttpHost::toHostString)
>>>>>>>>
>>>>>>>>    .collect(Collectors.joining(","));
>>>>>>>>                        }
>>>>>>>>
>>>>>>>>                        @Override
>>>>>>>>                        public
>>>>>>>>    ElasticsearchSink<Tuple2<Destination, String>> createSink(
>>>>>>>>                                String cacheKey,
>>>>>>>>    Tuple2<Destination, String> element) {
>>>>>>>>                            return new ElasticsearchSink.Builder<>(
>>>>>>>>                                            element.f0.httpHosts,
>>>>>>>>
>>>>>>>>    (ElasticsearchSinkFunction<
>>>>>>>>
>>>>>>>>    Tuple2<Destination, String>>)
>>>>>>>>                                                    (el, ctx,
>>>>>>>>    indexer) -> {
>>>>>>>>                                                        //
>>>>>>>>    Construct index request.
>>>>>>>>                                                        final
>>>>>>>>    IndexRequest request = ...;
>>>>>>>>
>>>>>>>>    indexer.add(request);
>>>>>>>>                                                    })
>>>>>>>>                                    .build();
>>>>>>>>                        }
>>>>>>>>                    }));
>>>>>>>>
>>>>>>>>
>>>>>>>> I hope this helps ;)
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> D.
>>>>>>>>
>>>>>>>>
>>>>>>>> On Mon, Aug 16, 2021 at 5:18 PM Rion Williams <
>>>>>>>> rionmonster@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Thanks for this suggestion David, it's extremely helpful.
>>>>>>>>>
>>>>>>>>> Since this will vary depending on the elements retrieved from a
>>>>>>>>> separate stream, I'm guessing something like the following would be
>>>>>>>>> roughly the avenue to continue down:
>>>>>>>>>
>>>>>>>>> fun main(args: Array<String>) {
>>>>>>>>>     val parameters = mergeParametersFromProperties(args)
>>>>>>>>>     val stream = StreamExecutionEnvironment.getExecutionEnvironment()
>>>>>>>>>
>>>>>>>>>     // Get the stream for tenant-specific Elastic configurations
>>>>>>>>>     val connectionStream = stream
>>>>>>>>>         .fromSource(
>>>>>>>>>             KafkaSource.of(parameters, listOf("elastic-configs")),
>>>>>>>>>             WatermarkStrategy.noWatermarks(),
>>>>>>>>>             "elastic-configs"
>>>>>>>>>         )
>>>>>>>>>
>>>>>>>>>     // Get the stream of incoming messages to be routed to Elastic
>>>>>>>>>     stream
>>>>>>>>>         .fromSource(
>>>>>>>>>             KafkaSource.of(parameters, listOf("messages")),
>>>>>>>>>             WatermarkStrategy.noWatermarks(),
>>>>>>>>>             "messages"
>>>>>>>>>         )
>>>>>>>>>         .keyBy { message ->
>>>>>>>>>             // Key by the tenant in the message
>>>>>>>>>             message.getTenant()
>>>>>>>>>         }
>>>>>>>>>         .connect(
>>>>>>>>>             // Connect the messages stream with the configurations
>>>>>>>>>             connectionStream
>>>>>>>>>         )
>>>>>>>>>         .process(object : KeyedCoProcessFunction<String, String, String, String>() {
>>>>>>>>>             // For this key, we need to store all of the previous messages in state
>>>>>>>>>             // in the case where we don't have a given mapping for this tenant yet
>>>>>>>>>             lateinit var messagesAwaitingConfigState: ListState<String>
>>>>>>>>>             lateinit var configState: ValueState<String>
>>>>>>>>>
>>>>>>>>>             override fun open(parameters: Configuration) {
>>>>>>>>>                 super.open(parameters)
>>>>>>>>>                 // Initialize the states
>>>>>>>>>                 messagesAwaitingConfigState = runtimeContext.getListState(awaitingStateDesc)
>>>>>>>>>                 configState = runtimeContext.getState(configStateDesc)
>>>>>>>>>             }
>>>>>>>>>
>>>>>>>>>             // When an element is received
>>>>>>>>>             override fun processElement1(message: String, context: Context, out: Collector<String>) {
>>>>>>>>>                 // Check if we have a mapping
>>>>>>>>>                 if (configState.value() == null){
>>>>>>>>>                     // We don't have a mapping for this tenant, store messages until we get it
>>>>>>>>>                     messagesAwaitingConfigState.add(message)
>>>>>>>>>                 }
>>>>>>>>>                 else {
>>>>>>>>>                     // Output the record with some indicator of the route?
>>>>>>>>>                     out.collect(message)
>>>>>>>>>                 }
>>>>>>>>>             }
>>>>>>>>>
>>>>>>>>>             override fun processElement2(config: String, context: Context, out: Collector<String>) {
>>>>>>>>>                 // If this mapping is for this specific tenant, store it and flush the pending
>>>>>>>>>                 // records in state
>>>>>>>>>                 if (config.getTenant() == context.currentKey){
>>>>>>>>>                     configState.update(config)
>>>>>>>>>                     val messagesToFlush = messagesAwaitingConfigState.get()
>>>>>>>>>                     messagesToFlush.forEach { message ->
>>>>>>>>>                         out.collect(message)
>>>>>>>>>                     }
>>>>>>>>>                 }
>>>>>>>>>             }
>>>>>>>>>
>>>>>>>>>             // State descriptors
>>>>>>>>>             val awaitingStateDesc = ListStateDescriptor(
>>>>>>>>>                 "messages-awaiting-config",
>>>>>>>>>                 TypeInformation.of(String::class.java)
>>>>>>>>>             )
>>>>>>>>>
>>>>>>>>>             val configStateDesc = ValueStateDescriptor(
>>>>>>>>>                 "elastic-config",
>>>>>>>>>                 TypeInformation.of(String::class.java)
>>>>>>>>>             )
>>>>>>>>>         })
>>>>>>>>>
>>>>>>>>>     stream.executeAsync("$APPLICATION_NAME-job")
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> Basically, connect my tenant-specific configuration stream with my
>>>>>>>>> incoming messages (keyed by tenant) and buffer them until I have a
>>>>>>>>> corresponding configuration (to avoid race-conditions). However, I'm
>>>>>>>>> guessing what will happen here is rather than directly outputting the
>>>>>>>>> messages from this process function, I'd construct some type of wrapper
>>>>>>>>> here with the necessary routing/configuration for the message (obtained via
>>>>>>>>> the configuration stream) along with the element, which might be something
>>>>>>>>> like a MessageWrapper<ElementT, ConfigurationT> and pass those
>>>>>>>>> elements to the sink, which would create the tenant-specific Elastic
>>>>>>>>> connection from the ConfigurationT element and handle caching it
>>>>>>>>> and then just grab the element and send it on it's way?
>>>>>>>>>
>>>>>>>>> Those are really the only bits I'm stuck on at the moment:
>>>>>>>>>
>>>>>>>>>    1. The shape of the elements being evicted from the process
>>>>>>>>>    function (Is a simple wrapper with the configuration for the sink enough
>>>>>>>>>    here? Do I need to explicitly initialize the sink within this function?
>>>>>>>>>    Etc.)
>>>>>>>>>    2. The actual use of the DynamicElasticsearchSink class (Would
>>>>>>>>>    it just be something like an .addSink(DynamicElasticSearchSink<String,
>>>>>>>>>    Configuration>()) or perhaps something else entirely?)
>>>>>>>>>
>>>>>>>>> Thanks again so much for the advice thus far David, it's greatly
>>>>>>>>> appreciated.
>>>>>>>>>
>>>>>>>>> Rion
>>>>>>>>>
>>>>>>>>> On Fri, Aug 13, 2021 at 9:04 AM David Morávek <dm...@apache.org>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> To give you a better idea, in high-level I think could look
>>>>>>>>>> something like this
>>>>>>>>>> <https://gist.github.com/dmvk/3f8124d585cd33e52cacd4a38b80f8c8>
>>>>>>>>>> [1].
>>>>>>>>>>
>>>>>>>>>> [1] https://gist.github.com/dmvk/3f8124d585cd33e52cacd4a38b80f8c8
>>>>>>>>>>
>>>>>>>>>> On Fri, Aug 13, 2021 at 2:57 PM Rion Williams <
>>>>>>>>>> rionmonster@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi David,
>>>>>>>>>>>
>>>>>>>>>>> Thanks for your response! I think there are currently quite a
>>>>>>>>>>> few unknowns in my end in terms of what a production loads look like but I
>>>>>>>>>>> think the number of clusters shouldn’t be too large (and will either rarely
>>>>>>>>>>> change or have new entries come in at runtime, but it needs to support
>>>>>>>>>>> that).
>>>>>>>>>>>
>>>>>>>>>>> I think the dynamic approach might be a good route to explore
>>>>>>>>>>> with actual changes to the Elasticsearch sink as a longer term option. I’m
>>>>>>>>>>> not sure what the dynamic one would look like at the moment though, perhaps
>>>>>>>>>>> that’s something you’d be able to advise on?
>>>>>>>>>>>
>>>>>>>>>>> Given that all the records are keyed for a given tenant and I
>>>>>>>>>>> would have the mappings stored in state, is it possible that within the
>>>>>>>>>>> open() function for this dynamic route to access the state and initialize
>>>>>>>>>>> the client there? Or maybe there’s some other approach (such as grouping by
>>>>>>>>>>> clusters and dynamically handling indices)?
>>>>>>>>>>>
>>>>>>>>>>> I’d be happy to give a shot at making the appropriate changes to
>>>>>>>>>>> the sink as well, although I’m far from an Elastic expert. If you point me
>>>>>>>>>>> in the right direction, I may be able to help out.
>>>>>>>>>>>
>>>>>>>>>>> Thanks much!
>>>>>>>>>>>
>>>>>>>>>>> Rion
>>>>>>>>>>>
>>>>>>>>>>> On Aug 13, 2021, at 6:52 AM, David Morávek <dm...@apache.org>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>> 
>>>>>>>>>>> Hi Rion,
>>>>>>>>>>>
>>>>>>>>>>> As you probably already know, for dynamic indices, you can
>>>>>>>>>>> simply implement your own ElasticsearchSinkFunction
>>>>>>>>>>> <https://github.com/apache/flink/blob/release-1.13.2/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkFunction.java>
>>>>>>>>>>> [1], where you can create any request that elastic client supports.
>>>>>>>>>>>
>>>>>>>>>>> The tricky part is how to implement dynamic routing into
>>>>>>>>>>> multiple clusters.
>>>>>>>>>>> - If the elastic clusters are known upfront (before submitting
>>>>>>>>>>> job), you can easily create multiple elastic sinks and prepend them with a
>>>>>>>>>>> simple filter (this is basically what split operator does).
>>>>>>>>>>> - If you discover elastics clusters at runtime, this would
>>>>>>>>>>> require some changes of the current ElasticsearchSink implementation. I
>>>>>>>>>>> think this may be actually as simple as introducing something like
>>>>>>>>>>> DynamicElasticsearchSink, that could dynamically create and managed "child"
>>>>>>>>>>> sinks. This would probably require some thoughts about how to manage
>>>>>>>>>>> consumed resources (memory), because number of child sink could be
>>>>>>>>>>> potentially unbounded. This could be of course simplified if underlying
>>>>>>>>>>> elastic client already supports that, which I'm not aware of. If you'd like
>>>>>>>>>>> to take this path, it would definitely be a great contribution to Flink
>>>>>>>>>>> (I'm able to provide some guidance).
>>>>>>>>>>>
>>>>>>>>>>> [1]
>>>>>>>>>>> https://github.com/apache/flink/blob/release-1.13.2/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkFunction.java
>>>>>>>>>>>
>>>>>>>>>>> Best,
>>>>>>>>>>> D.
>>>>>>>>>>>
>>>>>>>>>>> On Sun, Aug 8, 2021 at 4:24 PM Rion Williams <
>>>>>>>>>>> rionmonster@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi folks,
>>>>>>>>>>>>
>>>>>>>>>>>> I have a use-case that I wanted to initially pose to the
>>>>>>>>>>>> mailing list as I’m not terribly familiar with the Elasticsearch connector
>>>>>>>>>>>> to ensure I’m not going down the wrong path trying to accomplish this in
>>>>>>>>>>>> Flink (or if something downstream might be a better option).
>>>>>>>>>>>>
>>>>>>>>>>>> Basically, I have the following pieces to the puzzle:
>>>>>>>>>>>>
>>>>>>>>>>>>    - A stream of tenant-specific events
>>>>>>>>>>>>    - An HTTP endpoint containing mappings for tenant-specific
>>>>>>>>>>>>    Elastic cluster information (as each tenant has its own specific Elastic
>>>>>>>>>>>>    cluster/index)
>>>>>>>>>>>>
>>>>>>>>>>>> What I’m hoping to accomplish is the following:
>>>>>>>>>>>>
>>>>>>>>>>>>    1. One stream will periodically poll the HTTP endpoint and
>>>>>>>>>>>>    store these cluster mappings in state (keyed by tenant with cluster info as
>>>>>>>>>>>>    the value)
>>>>>>>>>>>>    2. The event stream will be keyed by tenant and connected
>>>>>>>>>>>>    to the cluster mappings stream.
>>>>>>>>>>>>    3. I’ll need to an Elasticsearch sink that can route the
>>>>>>>>>>>>    tenant-specific event data to its corresponding cluster/index from the
>>>>>>>>>>>>    mapping source.
>>>>>>>>>>>>
>>>>>>>>>>>> I know that the existing Elasticsearch sink supports dynamic
>>>>>>>>>>>> indices, however I didn’t know if it’s possible to adjust the cluster like
>>>>>>>>>>>> I would need on a per-tenant basis or if there’s a better approach here?
>>>>>>>>>>>>
>>>>>>>>>>>> Any advice would be appreciated.
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>
>>>>>>>>>>>> Rion
>>>>>>>>>>>>
>>>>>>>>>>>

Re: Dynamic Cluster/Index Routing for Elasticsearch Sink

Posted by David Morávek <dm...@apache.org>.
Hi Rion,

personally I'd start with unit test in the base module using a test sink
implementation. There is already *DummyElasticsearchSink* that you may be
able to reuse (just note that we're trying to get rid of Mockito based
tests such as this one).

I'm bit unsure that integration test would actually test anything extra
that the unit test doesn't in this case, so I'd recommend it as the next
step (I'm also bit concerned that this test would take a long time to
execute / be resource intensive as it would need to spawn more elastic
clusters?).

Best,
D.

On Thu, Aug 26, 2021 at 5:47 PM Rion Williams <ri...@gmail.com> wrote:

> Just chiming in on this again.
>
> I think I have the pieces in place regarding the implementation (both a
> DynamicElasticsearchSink class and ElasticsearchSinkRouter interface) added
> to the elasticsearch-base module. I noticed that HttpHost wasn't available
> within that module/in the tests, so I'd suspect that I'd need to add a
> dependency similar to those found within the specific ES implementations
> (5/6/7). I'd also assume that it may be best to just provide a dummy sink
> similar to the other patterning to handle writing the unit tests or would
> you recommend separate Elasticsearch integration tests using a
> TestContainer of each supported version (5/6/7) similar to those within the
> ElasticsearchSinkITCase files under each module?
>
> Any advice / recommendations on this front would be helpful. I want to
> write some tests surrounding this that demonstrate the most common
> use-cases, but also don't want to go overkill.
>
> Thanks again for all of your help,
>
> Rion
>
> On Wed, Aug 25, 2021 at 2:10 PM Rion Williams <ri...@gmail.com>
> wrote:
>
>> Thanks again David,
>>
>> I've spun up a JIRA issue for the ticket
>> <https://issues.apache.org/jira/browse/FLINK-23977> while I work on
>> getting things into the proper state. If someone with the
>> appropriate privileges could assign it to me, I'd be appreciative. I'll
>> likely need some assistance at a few points to ensure things look as
>> expected, but I'm happy to help with this contribution.
>>
>> Rion
>>
>> On Wed, Aug 25, 2021 at 11:37 AM David Morávek <dm...@apache.org> wrote:
>>
>>> AFAIK there are currently no other sources in Flink that can treat
>>> "other sources" / "destination" as data. Most complete generic work on this
>>> topic that I'm aware of are Splittable DoFn based IOs in Apache Beam.
>>>
>>> I think the best module for the contribution would be
>>> "elasticsearch-base", because this could be easily reused for all ES
>>> versions that we currently support.
>>>
>>> Best,
>>> D.
>>>
>>> On Wed, Aug 25, 2021 at 4:58 PM Rion Williams <ri...@gmail.com>
>>> wrote:
>>>
>>>> Hi David,
>>>>
>>>> That was perfect and it looks like this is working as I'd expected. I
>>>> put together some larger integration tests for my specific use-case
>>>> (multiple Elasticsearch clusters running in TestContainers) and verified
>>>> that messages were being routed dynamically to the appropriate sinks. I
>>>> forked the Flink repo last night and was trying to figure out the best
>>>> place to start adding these classes in (I noticed that there were three
>>>> separate ES packages targeting 5/6/7 respectively). I was going to try to
>>>> start fleshing the initial implementation for this, but wanted to make sure
>>>> that I was starting in the right place.
>>>>
>>>> Additionally, do you know of anything that might be similar to this
>>>> even within other sinks? Just trying to think of something to model this
>>>> after. Once I get things started, I'll spin up a JIRA issue for it and go
>>>> from there.
>>>>
>>>> Thanks so much for your help!
>>>>
>>>> Rion
>>>>
>>>> On Tue, Aug 24, 2021 at 1:45 AM David Morávek <dm...@apache.org> wrote:
>>>>
>>>>> Hi Rion,
>>>>>
>>>>> you just need to call *sink.setRuntimeContext(getRuntimeContext())*
>>>>> before opening the child sink. Please see *AbstractRichFunction* [1]
>>>>> (that EleasticsearchSink extends) for more details.
>>>>>
>>>>> One more note, instead of starting with integration test, I'd
>>>>> recommend writing a unit test using *operator test harness* [2]
>>>>> first. This should help you to discover / debug many issues upfront. You
>>>>> can use *ElasticsearchSinkBaseTest* [3] as an example.
>>>>>
>>>>> [1]
>>>>> https://github.com/apache/flink/blob/release-1.13.2/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractRichFunction.java#L52
>>>>> [2]
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/testing/#unit-testing-stateful-or-timely-udfs--custom-operators
>>>>> [3]
>>>>> https://github.com/apache/flink/blob/release-1.13.2/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java
>>>>>
>>>>> Best,
>>>>> D.
>>>>>
>>>>> On Tue, Aug 24, 2021 at 12:03 AM Rion Williams <ri...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi David,
>>>>>>
>>>>>> Thanks again for the response, I believe that I'm getting pretty
>>>>>> close for at least a POC-level implementation of this. Currently, I'm
>>>>>> working with JsonObject instances throughout the pipeline, so I wanted to
>>>>>> try this out and simply stored the routing information within the element
>>>>>> itself for simplicity's sake right now, so it has a shape that looks
>>>>>> something like this:
>>>>>>
>>>>>> {
>>>>>>     "route": {
>>>>>>         "hosts": "...",
>>>>>>         "index": "...",
>>>>>>         ...
>>>>>>     },
>>>>>>     "all-other-fields-here"
>>>>>> }
>>>>>>
>>>>>> And I've stripped back several of the layers of the routers (since I
>>>>>> already have all of the information in the element at that point). I tried
>>>>>> using something like this:
>>>>>>
>>>>>> class DynamicElasticsearchSink: RichSinkFunction<JsonObject>(), CheckpointedFunction {
>>>>>>     private val sinkRoutes: MutableMap<String, ElasticsearchSink<JsonObject>> = ConcurrentHashMap()
>>>>>>     private lateinit var configuration: Configuration
>>>>>>
>>>>>>     override fun open(parameters: Configuration) {
>>>>>>         configuration = parameters
>>>>>>     }
>>>>>>
>>>>>>     override fun invoke(element: JsonObject, context: SinkFunction.Context) {
>>>>>>         val route = getHost(element)
>>>>>>         // Check if we already have a router for this cluster
>>>>>>         var sink = sinkRoutes[route]
>>>>>>         if (sink == null) {
>>>>>>             // If not, create one
>>>>>>             sink = buildSinkFromRoute(element)
>>>>>>             sink.open(configuration)
>>>>>>             sinkRoutes[route] = sink
>>>>>>         }
>>>>>>
>>>>>>         sink.invoke(element, context)
>>>>>>     }
>>>>>>
>>>>>>     override fun initializeState(context: FunctionInitializationContext) {
>>>>>>         // No-op.
>>>>>>     }
>>>>>>
>>>>>>     override fun snapshotState(context: FunctionSnapshotContext) {
>>>>>>         // This is used only to flush pending writes.
>>>>>>         for (sink in sinkRoutes.values) {
>>>>>>             sink.snapshotState(context)
>>>>>>         }
>>>>>>     }
>>>>>>
>>>>>>     override fun close() {
>>>>>>         for (sink in sinkRoutes.values) {
>>>>>>             sink.close()
>>>>>>         }
>>>>>>     }
>>>>>>
>>>>>>     private fun buildSinkFromRoute(element: JsonObject, ho): ElasticsearchSink<JsonObject> {
>>>>>>         val builder = ElasticsearchSink.Builder<JsonObject>(
>>>>>>             buildHostsFromElement(element),
>>>>>>             ElasticsearchRoutingFunction()
>>>>>>         )
>>>>>>
>>>>>>         builder.setBulkFlushMaxActions(1)
>>>>>>
>>>>>>         // TODO: Configure authorization if available
>>>>>> //        builder.setRestClientFactory { restClient ->
>>>>>> //            restClient.setHttpClientConfigCallback(object : RestClientBuilder.HttpClientConfigCallback {
>>>>>> //                override fun customizeHttpClient(builder: HttpAsyncClientBuilder): HttpAsyncClientBuilder {
>>>>>> //                    // Configure authorization here
>>>>>> //                    val credentialsProvider = BasicCredentialsProvider().apply {
>>>>>> //                        setCredentials(
>>>>>> //                            AuthScope.ANY,
>>>>>> //                            UsernamePasswordCredentials("$USERNAME", "$PASSWORD")
>>>>>> //                        )
>>>>>> //                    }
>>>>>> //
>>>>>> //                    return builder.setDefaultCredentialsProvider(credentialsProvider);
>>>>>> //                }
>>>>>> //            })
>>>>>> //        }
>>>>>>
>>>>>>         return builder.build()
>>>>>>     }
>>>>>>
>>>>>>     private fun buildHostsFromElement(element: JsonObject): List<HttpHost>{
>>>>>>         val transportAddresses = element
>>>>>>             .get("route").asJsonObject
>>>>>>             .get("hosts").asString
>>>>>>
>>>>>>         // If there are multiple, they should be comma-delimited
>>>>>>         val addresses = transportAddresses.split(",")
>>>>>>         return addresses
>>>>>>             .filter { address -> address.isNotEmpty() }
>>>>>>             .map { address ->
>>>>>>                 HttpHost.create(address)
>>>>>>             }
>>>>>>     }
>>>>>>
>>>>>>     private fun getHost(element: JsonObject): String {
>>>>>>         return element
>>>>>>             .get("route").asJsonObject
>>>>>>             .get("hosts").asString
>>>>>>     }
>>>>>>
>>>>>>     private class ElasticsearchRoutingFunction: ElasticsearchSinkFunction<JsonObject> {
>>>>>>         override fun process(element: JsonObject, context: RuntimeContext, indexer: RequestIndexer) {
>>>>>>             indexer.add(request(element))
>>>>>>         }
>>>>>>
>>>>>>         private fun request(element: JsonObject): IndexRequest {
>>>>>>             // Access routing information
>>>>>>             val index = element
>>>>>>                 .get("route").asJsonObject
>>>>>>                 .get("index").asString
>>>>>>
>>>>>>             // Strip off routing information
>>>>>>             element.remove("route")
>>>>>>
>>>>>>             // Send the request
>>>>>>             return Requests.indexRequest()
>>>>>>                 .index(index)
>>>>>>                 .type("_doc")
>>>>>>                 .source(mapOf(
>>>>>>                     "data" to "$element"
>>>>>>                 ))
>>>>>>         }
>>>>>>     }
>>>>>> }
>>>>>>
>>>>>> After running an integration test, I keep encountering running into
>>>>>> the following error during the invocation of the child sink:
>>>>>>
>>>>>> // The runtime context has not been initialized.
>>>>>> sink.invoke(element, context)
>>>>>>
>>>>>> I can see the underlying sink getting initialized, the open call
>>>>>> being made, etc. however for some reason it looks like there's an issue
>>>>>> related to the context during the invoke call namely* "The runtime
>>>>>> context has not been initialized". *I had assumed this would be
>>>>>> alright since the context for the "wrapper" should have already been
>>>>>> initialized, but maybe there's something that I'm missing.
>>>>>>
>>>>>> Also, please forgive any hastily written or nasty code as this is
>>>>>> purely a POC to see if I could get this to work using a single object. I
>>>>>> have the hopes of cleaning it up and genericizing it after I am confident
>>>>>> that it actually works.
>>>>>>
>>>>>> Thanks so much again,
>>>>>>
>>>>>> Rion
>>>>>>
>>>>>> On Mon, Aug 23, 2021 at 11:12 AM David Morávek <dm...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Rion,
>>>>>>>
>>>>>>> Sorry for late reply, I've missed your previous message. Thanks
>>>>>>> Arvid for the reminder <3.
>>>>>>>
>>>>>>> something like a MessageWrapper<ElementT, ConfigurationT> and pass
>>>>>>>> those elements to the sink, which would create the tenant-specific Elastic
>>>>>>>> connection from the ConfigurationT element and handle caching it
>>>>>>>> and then just grab the element and send it on it's way?
>>>>>>>
>>>>>>>
>>>>>>> Yes, this is exactly what I had in mind. There should be almost no
>>>>>>> overhead as sink can be easily chained with your join
>>>>>>> (KeyedCoProcessFunction) function.
>>>>>>>
>>>>>>>    -
>>>>>>>    -
>>>>>>>>
>>>>>>>>    The shape of the elements being evicted from the process
>>>>>>>>    function (Is a simple wrapper with the configuration for the sink enough
>>>>>>>>    here? Do I need to explicitly initialize the sink within this function?
>>>>>>>>    Etc.)
>>>>>>>
>>>>>>>    -
>>>>>>>    - To write an element you need a configuration for the
>>>>>>>    destination and the element itself, so a tuple of *(ElasticConfiguration,
>>>>>>>    Element)* should be enough (that's basically your MessageWrapper<ElementT,
>>>>>>>    ConfigurationT> class).
>>>>>>>
>>>>>>>
>>>>>>>    -
>>>>>>>    -
>>>>>>>>
>>>>>>>>    The actual use of the *DynamicElasticsearchSink* class (Would
>>>>>>>>    it just be something like an *.addSink(*
>>>>>>>>    *DynamicElasticSearchSink<**String, Configuration>())* or
>>>>>>>>    perhaps something else entirely?)
>>>>>>>
>>>>>>>    -
>>>>>>>
>>>>>>> I guess it could look something like the snippet below. It would be
>>>>>>> definitely good to play around with the *DynamicElasticSearchSink*
>>>>>>> API and make it more meaningful / user friendly (the gist I've shared was
>>>>>>> just a very rough prototype to showcase the idea).
>>>>>>>
>>>>>>>
>>>>>>>    - static class Destination {
>>>>>>>
>>>>>>>        private final List<HttpHost> httpHosts;
>>>>>>>
>>>>>>>        Destination(List<HttpHost> httpHosts) {
>>>>>>>            this.httpHosts = httpHosts;
>>>>>>>        }
>>>>>>>    }
>>>>>>>    -
>>>>>>>    - final DataStream<Tuple2<Destination, String>> toWrite = ...;
>>>>>>>    toWrite.addSink(
>>>>>>>            new DynamicElasticsearchSink<>(
>>>>>>>                    new SinkRouter<
>>>>>>>                            Tuple2<Destination, String>,
>>>>>>>                            String,
>>>>>>>                            ElasticsearchSink<Tuple2<Destination,
>>>>>>>    String>>>() {
>>>>>>>
>>>>>>>                        @Override
>>>>>>>                        public String getRoute(Tuple2<Destination,
>>>>>>>    String> element) {
>>>>>>>    -                         // Construct a deterministic unique
>>>>>>>    caching key for the destination... (this could be cheaper if you know the
>>>>>>>    data)
>>>>>>>                            return element.f0.httpHosts.stream()
>>>>>>>                                    .map(HttpHost::toHostString)
>>>>>>>
>>>>>>>    .collect(Collectors.joining(","));
>>>>>>>                        }
>>>>>>>
>>>>>>>                        @Override
>>>>>>>                        public ElasticsearchSink<Tuple2<Destination,
>>>>>>>    String>> createSink(
>>>>>>>                                String cacheKey, Tuple2<Destination,
>>>>>>>    String> element) {
>>>>>>>                            return new ElasticsearchSink.Builder<>(
>>>>>>>                                            element.f0.httpHosts,
>>>>>>>
>>>>>>>    (ElasticsearchSinkFunction<
>>>>>>>
>>>>>>>    Tuple2<Destination, String>>)
>>>>>>>                                                    (el, ctx,
>>>>>>>    indexer) -> {
>>>>>>>                                                        // Construct
>>>>>>>    index request.
>>>>>>>                                                        final
>>>>>>>    IndexRequest request = ...;
>>>>>>>
>>>>>>>    indexer.add(request);
>>>>>>>                                                    })
>>>>>>>                                    .build();
>>>>>>>                        }
>>>>>>>                    }));
>>>>>>>
>>>>>>>
>>>>>>> I hope this helps ;)
>>>>>>>
>>>>>>> Best,
>>>>>>> D.
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Aug 16, 2021 at 5:18 PM Rion Williams <ri...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Thanks for this suggestion David, it's extremely helpful.
>>>>>>>>
>>>>>>>> Since this will vary depending on the elements retrieved from a
>>>>>>>> separate stream, I'm guessing something like the following would be
>>>>>>>> roughly the avenue to continue down:
>>>>>>>>
>>>>>>>> fun main(args: Array<String>) {
>>>>>>>>     val parameters = mergeParametersFromProperties(args)
>>>>>>>>     val stream = StreamExecutionEnvironment.getExecutionEnvironment()
>>>>>>>>
>>>>>>>>     // Get the stream for tenant-specific Elastic configurations
>>>>>>>>     val connectionStream = stream
>>>>>>>>         .fromSource(
>>>>>>>>             KafkaSource.of(parameters, listOf("elastic-configs")),
>>>>>>>>             WatermarkStrategy.noWatermarks(),
>>>>>>>>             "elastic-configs"
>>>>>>>>         )
>>>>>>>>
>>>>>>>>     // Get the stream of incoming messages to be routed to Elastic
>>>>>>>>     stream
>>>>>>>>         .fromSource(
>>>>>>>>             KafkaSource.of(parameters, listOf("messages")),
>>>>>>>>             WatermarkStrategy.noWatermarks(),
>>>>>>>>             "messages"
>>>>>>>>         )
>>>>>>>>         .keyBy { message ->
>>>>>>>>             // Key by the tenant in the message
>>>>>>>>             message.getTenant()
>>>>>>>>         }
>>>>>>>>         .connect(
>>>>>>>>             // Connect the messages stream with the configurations
>>>>>>>>             connectionStream
>>>>>>>>         )
>>>>>>>>         .process(object : KeyedCoProcessFunction<String, String, String, String>() {
>>>>>>>>             // For this key, we need to store all of the previous messages in state
>>>>>>>>             // in the case where we don't have a given mapping for this tenant yet
>>>>>>>>             lateinit var messagesAwaitingConfigState: ListState<String>
>>>>>>>>             lateinit var configState: ValueState<String>
>>>>>>>>
>>>>>>>>             override fun open(parameters: Configuration) {
>>>>>>>>                 super.open(parameters)
>>>>>>>>                 // Initialize the states
>>>>>>>>                 messagesAwaitingConfigState = runtimeContext.getListState(awaitingStateDesc)
>>>>>>>>                 configState = runtimeContext.getState(configStateDesc)
>>>>>>>>             }
>>>>>>>>
>>>>>>>>             // When an element is received
>>>>>>>>             override fun processElement1(message: String, context: Context, out: Collector<String>) {
>>>>>>>>                 // Check if we have a mapping
>>>>>>>>                 if (configState.value() == null){
>>>>>>>>                     // We don't have a mapping for this tenant, store messages until we get it
>>>>>>>>                     messagesAwaitingConfigState.add(message)
>>>>>>>>                 }
>>>>>>>>                 else {
>>>>>>>>                     // Output the record with some indicator of the route?
>>>>>>>>                     out.collect(message)
>>>>>>>>                 }
>>>>>>>>             }
>>>>>>>>
>>>>>>>>             override fun processElement2(config: String, context: Context, out: Collector<String>) {
>>>>>>>>                 // If this mapping is for this specific tenant, store it and flush the pending
>>>>>>>>                 // records in state
>>>>>>>>                 if (config.getTenant() == context.currentKey){
>>>>>>>>                     configState.update(config)
>>>>>>>>                     val messagesToFlush = messagesAwaitingConfigState.get()
>>>>>>>>                     messagesToFlush.forEach { message ->
>>>>>>>>                         out.collect(message)
>>>>>>>>                     }
>>>>>>>>                 }
>>>>>>>>             }
>>>>>>>>
>>>>>>>>             // State descriptors
>>>>>>>>             val awaitingStateDesc = ListStateDescriptor(
>>>>>>>>                 "messages-awaiting-config",
>>>>>>>>                 TypeInformation.of(String::class.java)
>>>>>>>>             )
>>>>>>>>
>>>>>>>>             val configStateDesc = ValueStateDescriptor(
>>>>>>>>                 "elastic-config",
>>>>>>>>                 TypeInformation.of(String::class.java)
>>>>>>>>             )
>>>>>>>>         })
>>>>>>>>
>>>>>>>>     stream.executeAsync("$APPLICATION_NAME-job")
>>>>>>>> }
>>>>>>>>
>>>>>>>> Basically, connect my tenant-specific configuration stream with my
>>>>>>>> incoming messages (keyed by tenant) and buffer them until I have a
>>>>>>>> corresponding configuration (to avoid race-conditions). However, I'm
>>>>>>>> guessing what will happen here is rather than directly outputting the
>>>>>>>> messages from this process function, I'd construct some type of wrapper
>>>>>>>> here with the necessary routing/configuration for the message (obtained via
>>>>>>>> the configuration stream) along with the element, which might be something
>>>>>>>> like a MessageWrapper<ElementT, ConfigurationT> and pass those
>>>>>>>> elements to the sink, which would create the tenant-specific Elastic
>>>>>>>> connection from the ConfigurationT element and handle caching it
>>>>>>>> and then just grab the element and send it on it's way?
>>>>>>>>
>>>>>>>> Those are really the only bits I'm stuck on at the moment:
>>>>>>>>
>>>>>>>>    1. The shape of the elements being evicted from the process
>>>>>>>>    function (Is a simple wrapper with the configuration for the sink enough
>>>>>>>>    here? Do I need to explicitly initialize the sink within this function?
>>>>>>>>    Etc.)
>>>>>>>>    2. The actual use of the DynamicElasticsearchSink class (Would
>>>>>>>>    it just be something like an .addSink(DynamicElasticSearchSink<String,
>>>>>>>>    Configuration>()) or perhaps something else entirely?)
>>>>>>>>
>>>>>>>> Thanks again so much for the advice thus far David, it's greatly
>>>>>>>> appreciated.
>>>>>>>>
>>>>>>>> Rion
>>>>>>>>
>>>>>>>> On Fri, Aug 13, 2021 at 9:04 AM David Morávek <dm...@apache.org>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> To give you a better idea, in high-level I think could look
>>>>>>>>> something like this
>>>>>>>>> <https://gist.github.com/dmvk/3f8124d585cd33e52cacd4a38b80f8c8>
>>>>>>>>> [1].
>>>>>>>>>
>>>>>>>>> [1] https://gist.github.com/dmvk/3f8124d585cd33e52cacd4a38b80f8c8
>>>>>>>>>
>>>>>>>>> On Fri, Aug 13, 2021 at 2:57 PM Rion Williams <
>>>>>>>>> rionmonster@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi David,
>>>>>>>>>>
>>>>>>>>>> Thanks for your response! I think there are currently quite a few
>>>>>>>>>> unknowns in my end in terms of what a production loads look like but I
>>>>>>>>>> think the number of clusters shouldn’t be too large (and will either rarely
>>>>>>>>>> change or have new entries come in at runtime, but it needs to support
>>>>>>>>>> that).
>>>>>>>>>>
>>>>>>>>>> I think the dynamic approach might be a good route to explore
>>>>>>>>>> with actual changes to the Elasticsearch sink as a longer term option. I’m
>>>>>>>>>> not sure what the dynamic one would look like at the moment though, perhaps
>>>>>>>>>> that’s something you’d be able to advise on?
>>>>>>>>>>
>>>>>>>>>> Given that all the records are keyed for a given tenant and I
>>>>>>>>>> would have the mappings stored in state, is it possible that within the
>>>>>>>>>> open() function for this dynamic route to access the state and initialize
>>>>>>>>>> the client there? Or maybe there’s some other approach (such as grouping by
>>>>>>>>>> clusters and dynamically handling indices)?
>>>>>>>>>>
>>>>>>>>>> I’d be happy to give a shot at making the appropriate changes to
>>>>>>>>>> the sink as well, although I’m far from an Elastic expert. If you point me
>>>>>>>>>> in the right direction, I may be able to help out.
>>>>>>>>>>
>>>>>>>>>> Thanks much!
>>>>>>>>>>
>>>>>>>>>> Rion
>>>>>>>>>>
>>>>>>>>>> On Aug 13, 2021, at 6:52 AM, David Morávek <dm...@apache.org>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>> 
>>>>>>>>>> Hi Rion,
>>>>>>>>>>
>>>>>>>>>> As you probably already know, for dynamic indices, you can simply
>>>>>>>>>> implement your own ElasticsearchSinkFunction
>>>>>>>>>> <https://github.com/apache/flink/blob/release-1.13.2/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkFunction.java>
>>>>>>>>>> [1], where you can create any request that elastic client supports.
>>>>>>>>>>
>>>>>>>>>> The tricky part is how to implement dynamic routing into multiple
>>>>>>>>>> clusters.
>>>>>>>>>> - If the elastic clusters are known upfront (before submitting
>>>>>>>>>> job), you can easily create multiple elastic sinks and prepend them with a
>>>>>>>>>> simple filter (this is basically what split operator does).
>>>>>>>>>> - If you discover elastics clusters at runtime, this would
>>>>>>>>>> require some changes of the current ElasticsearchSink implementation. I
>>>>>>>>>> think this may be actually as simple as introducing something like
>>>>>>>>>> DynamicElasticsearchSink, that could dynamically create and managed "child"
>>>>>>>>>> sinks. This would probably require some thoughts about how to manage
>>>>>>>>>> consumed resources (memory), because number of child sink could be
>>>>>>>>>> potentially unbounded. This could be of course simplified if underlying
>>>>>>>>>> elastic client already supports that, which I'm not aware of. If you'd like
>>>>>>>>>> to take this path, it would definitely be a great contribution to Flink
>>>>>>>>>> (I'm able to provide some guidance).
>>>>>>>>>>
>>>>>>>>>> [1]
>>>>>>>>>> https://github.com/apache/flink/blob/release-1.13.2/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkFunction.java
>>>>>>>>>>
>>>>>>>>>> Best,
>>>>>>>>>> D.
>>>>>>>>>>
>>>>>>>>>> On Sun, Aug 8, 2021 at 4:24 PM Rion Williams <
>>>>>>>>>> rionmonster@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi folks,
>>>>>>>>>>>
>>>>>>>>>>> I have a use-case that I wanted to initially pose to the mailing
>>>>>>>>>>> list as I’m not terribly familiar with the Elasticsearch connector to
>>>>>>>>>>> ensure I’m not going down the wrong path trying to accomplish this in Flink
>>>>>>>>>>> (or if something downstream might be a better option).
>>>>>>>>>>>
>>>>>>>>>>> Basically, I have the following pieces to the puzzle:
>>>>>>>>>>>
>>>>>>>>>>>    - A stream of tenant-specific events
>>>>>>>>>>>    - An HTTP endpoint containing mappings for tenant-specific
>>>>>>>>>>>    Elastic cluster information (as each tenant has its own specific Elastic
>>>>>>>>>>>    cluster/index)
>>>>>>>>>>>
>>>>>>>>>>> What I’m hoping to accomplish is the following:
>>>>>>>>>>>
>>>>>>>>>>>    1. One stream will periodically poll the HTTP endpoint and
>>>>>>>>>>>    store these cluster mappings in state (keyed by tenant with cluster info as
>>>>>>>>>>>    the value)
>>>>>>>>>>>    2. The event stream will be keyed by tenant and connected to
>>>>>>>>>>>    the cluster mappings stream.
>>>>>>>>>>>    3. I’ll need to an Elasticsearch sink that can route the
>>>>>>>>>>>    tenant-specific event data to its corresponding cluster/index from the
>>>>>>>>>>>    mapping source.
>>>>>>>>>>>
>>>>>>>>>>> I know that the existing Elasticsearch sink supports dynamic
>>>>>>>>>>> indices, however I didn’t know if it’s possible to adjust the cluster like
>>>>>>>>>>> I would need on a per-tenant basis or if there’s a better approach here?
>>>>>>>>>>>
>>>>>>>>>>> Any advice would be appreciated.
>>>>>>>>>>>
>>>>>>>>>>> Thanks,
>>>>>>>>>>>
>>>>>>>>>>> Rion
>>>>>>>>>>>
>>>>>>>>>>

Re: Dynamic Cluster/Index Routing for Elasticsearch Sink

Posted by Rion Williams <ri...@gmail.com>.
Just chiming in on this again.

I think I have the pieces in place regarding the implementation (both a
DynamicElasticsearchSink class and ElasticsearchSinkRouter interface) added
to the elasticsearch-base module. I noticed that HttpHost wasn't available
within that module/in the tests, so I'd suspect that I'd need to add a
dependency similar to those found within the specific ES implementations
(5/6/7). I'd also assume that it may be best to just provide a dummy sink
similar to the other patterning to handle writing the unit tests or would
you recommend separate Elasticsearch integration tests using a
TestContainer of each supported version (5/6/7) similar to those within the
ElasticsearchSinkITCase files under each module?

Any advice / recommendations on this front would be helpful. I want to
write some tests surrounding this that demonstrate the most common
use-cases, but also don't want to go overkill.

Thanks again for all of your help,

Rion

On Wed, Aug 25, 2021 at 2:10 PM Rion Williams <ri...@gmail.com> wrote:

> Thanks again David,
>
> I've spun up a JIRA issue for the ticket
> <https://issues.apache.org/jira/browse/FLINK-23977> while I work on
> getting things into the proper state. If someone with the
> appropriate privileges could assign it to me, I'd be appreciative. I'll
> likely need some assistance at a few points to ensure things look as
> expected, but I'm happy to help with this contribution.
>
> Rion
>
> On Wed, Aug 25, 2021 at 11:37 AM David Morávek <dm...@apache.org> wrote:
>
>> AFAIK there are currently no other sources in Flink that can treat "other
>> sources" / "destination" as data. Most complete generic work on this topic
>> that I'm aware of are Splittable DoFn based IOs in Apache Beam.
>>
>> I think the best module for the contribution would be
>> "elasticsearch-base", because this could be easily reused for all ES
>> versions that we currently support.
>>
>> Best,
>> D.
>>
>> On Wed, Aug 25, 2021 at 4:58 PM Rion Williams <ri...@gmail.com>
>> wrote:
>>
>>> Hi David,
>>>
>>> That was perfect and it looks like this is working as I'd expected. I
>>> put together some larger integration tests for my specific use-case
>>> (multiple Elasticsearch clusters running in TestContainers) and verified
>>> that messages were being routed dynamically to the appropriate sinks. I
>>> forked the Flink repo last night and was trying to figure out the best
>>> place to start adding these classes in (I noticed that there were three
>>> separate ES packages targeting 5/6/7 respectively). I was going to try to
>>> start fleshing the initial implementation for this, but wanted to make sure
>>> that I was starting in the right place.
>>>
>>> Additionally, do you know of anything that might be similar to this even
>>> within other sinks? Just trying to think of something to model this after.
>>> Once I get things started, I'll spin up a JIRA issue for it and go from
>>> there.
>>>
>>> Thanks so much for your help!
>>>
>>> Rion
>>>
>>> On Tue, Aug 24, 2021 at 1:45 AM David Morávek <dm...@apache.org> wrote:
>>>
>>>> Hi Rion,
>>>>
>>>> you just need to call *sink.setRuntimeContext(getRuntimeContext())*
>>>> before opening the child sink. Please see *AbstractRichFunction* [1]
>>>> (that EleasticsearchSink extends) for more details.
>>>>
>>>> One more note, instead of starting with integration test, I'd recommend
>>>> writing a unit test using *operator test harness* [2] first. This
>>>> should help you to discover / debug many issues upfront. You can use
>>>> *ElasticsearchSinkBaseTest* [3] as an example.
>>>>
>>>> [1]
>>>> https://github.com/apache/flink/blob/release-1.13.2/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractRichFunction.java#L52
>>>> [2]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/testing/#unit-testing-stateful-or-timely-udfs--custom-operators
>>>> [3]
>>>> https://github.com/apache/flink/blob/release-1.13.2/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java
>>>>
>>>> Best,
>>>> D.
>>>>
>>>> On Tue, Aug 24, 2021 at 12:03 AM Rion Williams <ri...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi David,
>>>>>
>>>>> Thanks again for the response, I believe that I'm getting pretty close
>>>>> for at least a POC-level implementation of this. Currently, I'm working
>>>>> with JsonObject instances throughout the pipeline, so I wanted to try this
>>>>> out and simply stored the routing information within the element itself for
>>>>> simplicity's sake right now, so it has a shape that looks something like
>>>>> this:
>>>>>
>>>>> {
>>>>>     "route": {
>>>>>         "hosts": "...",
>>>>>         "index": "...",
>>>>>         ...
>>>>>     },
>>>>>     "all-other-fields-here"
>>>>> }
>>>>>
>>>>> And I've stripped back several of the layers of the routers (since I
>>>>> already have all of the information in the element at that point). I tried
>>>>> using something like this:
>>>>>
>>>>> class DynamicElasticsearchSink: RichSinkFunction<JsonObject>(), CheckpointedFunction {
>>>>>     private val sinkRoutes: MutableMap<String, ElasticsearchSink<JsonObject>> = ConcurrentHashMap()
>>>>>     private lateinit var configuration: Configuration
>>>>>
>>>>>     override fun open(parameters: Configuration) {
>>>>>         configuration = parameters
>>>>>     }
>>>>>
>>>>>     override fun invoke(element: JsonObject, context: SinkFunction.Context) {
>>>>>         val route = getHost(element)
>>>>>         // Check if we already have a router for this cluster
>>>>>         var sink = sinkRoutes[route]
>>>>>         if (sink == null) {
>>>>>             // If not, create one
>>>>>             sink = buildSinkFromRoute(element)
>>>>>             sink.open(configuration)
>>>>>             sinkRoutes[route] = sink
>>>>>         }
>>>>>
>>>>>         sink.invoke(element, context)
>>>>>     }
>>>>>
>>>>>     override fun initializeState(context: FunctionInitializationContext) {
>>>>>         // No-op.
>>>>>     }
>>>>>
>>>>>     override fun snapshotState(context: FunctionSnapshotContext) {
>>>>>         // This is used only to flush pending writes.
>>>>>         for (sink in sinkRoutes.values) {
>>>>>             sink.snapshotState(context)
>>>>>         }
>>>>>     }
>>>>>
>>>>>     override fun close() {
>>>>>         for (sink in sinkRoutes.values) {
>>>>>             sink.close()
>>>>>         }
>>>>>     }
>>>>>
>>>>>     private fun buildSinkFromRoute(element: JsonObject, ho): ElasticsearchSink<JsonObject> {
>>>>>         val builder = ElasticsearchSink.Builder<JsonObject>(
>>>>>             buildHostsFromElement(element),
>>>>>             ElasticsearchRoutingFunction()
>>>>>         )
>>>>>
>>>>>         builder.setBulkFlushMaxActions(1)
>>>>>
>>>>>         // TODO: Configure authorization if available
>>>>> //        builder.setRestClientFactory { restClient ->
>>>>> //            restClient.setHttpClientConfigCallback(object : RestClientBuilder.HttpClientConfigCallback {
>>>>> //                override fun customizeHttpClient(builder: HttpAsyncClientBuilder): HttpAsyncClientBuilder {
>>>>> //                    // Configure authorization here
>>>>> //                    val credentialsProvider = BasicCredentialsProvider().apply {
>>>>> //                        setCredentials(
>>>>> //                            AuthScope.ANY,
>>>>> //                            UsernamePasswordCredentials("$USERNAME", "$PASSWORD")
>>>>> //                        )
>>>>> //                    }
>>>>> //
>>>>> //                    return builder.setDefaultCredentialsProvider(credentialsProvider);
>>>>> //                }
>>>>> //            })
>>>>> //        }
>>>>>
>>>>>         return builder.build()
>>>>>     }
>>>>>
>>>>>     private fun buildHostsFromElement(element: JsonObject): List<HttpHost>{
>>>>>         val transportAddresses = element
>>>>>             .get("route").asJsonObject
>>>>>             .get("hosts").asString
>>>>>
>>>>>         // If there are multiple, they should be comma-delimited
>>>>>         val addresses = transportAddresses.split(",")
>>>>>         return addresses
>>>>>             .filter { address -> address.isNotEmpty() }
>>>>>             .map { address ->
>>>>>                 HttpHost.create(address)
>>>>>             }
>>>>>     }
>>>>>
>>>>>     private fun getHost(element: JsonObject): String {
>>>>>         return element
>>>>>             .get("route").asJsonObject
>>>>>             .get("hosts").asString
>>>>>     }
>>>>>
>>>>>     private class ElasticsearchRoutingFunction: ElasticsearchSinkFunction<JsonObject> {
>>>>>         override fun process(element: JsonObject, context: RuntimeContext, indexer: RequestIndexer) {
>>>>>             indexer.add(request(element))
>>>>>         }
>>>>>
>>>>>         private fun request(element: JsonObject): IndexRequest {
>>>>>             // Access routing information
>>>>>             val index = element
>>>>>                 .get("route").asJsonObject
>>>>>                 .get("index").asString
>>>>>
>>>>>             // Strip off routing information
>>>>>             element.remove("route")
>>>>>
>>>>>             // Send the request
>>>>>             return Requests.indexRequest()
>>>>>                 .index(index)
>>>>>                 .type("_doc")
>>>>>                 .source(mapOf(
>>>>>                     "data" to "$element"
>>>>>                 ))
>>>>>         }
>>>>>     }
>>>>> }
>>>>>
>>>>> After running an integration test, I keep encountering running into
>>>>> the following error during the invocation of the child sink:
>>>>>
>>>>> // The runtime context has not been initialized.
>>>>> sink.invoke(element, context)
>>>>>
>>>>> I can see the underlying sink getting initialized, the open call being
>>>>> made, etc. however for some reason it looks like there's an issue related
>>>>> to the context during the invoke call namely* "The runtime context
>>>>> has not been initialized". *I had assumed this would be alright since
>>>>> the context for the "wrapper" should have already been initialized, but
>>>>> maybe there's something that I'm missing.
>>>>>
>>>>> Also, please forgive any hastily written or nasty code as this is
>>>>> purely a POC to see if I could get this to work using a single object. I
>>>>> have the hopes of cleaning it up and genericizing it after I am confident
>>>>> that it actually works.
>>>>>
>>>>> Thanks so much again,
>>>>>
>>>>> Rion
>>>>>
>>>>> On Mon, Aug 23, 2021 at 11:12 AM David Morávek <dm...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> Hi Rion,
>>>>>>
>>>>>> Sorry for late reply, I've missed your previous message. Thanks Arvid
>>>>>> for the reminder <3.
>>>>>>
>>>>>> something like a MessageWrapper<ElementT, ConfigurationT> and pass
>>>>>>> those elements to the sink, which would create the tenant-specific Elastic
>>>>>>> connection from the ConfigurationT element and handle caching it
>>>>>>> and then just grab the element and send it on it's way?
>>>>>>
>>>>>>
>>>>>> Yes, this is exactly what I had in mind. There should be almost no
>>>>>> overhead as sink can be easily chained with your join
>>>>>> (KeyedCoProcessFunction) function.
>>>>>>
>>>>>>    -
>>>>>>    -
>>>>>>>
>>>>>>>    The shape of the elements being evicted from the process
>>>>>>>    function (Is a simple wrapper with the configuration for the sink enough
>>>>>>>    here? Do I need to explicitly initialize the sink within this function?
>>>>>>>    Etc.)
>>>>>>
>>>>>>    -
>>>>>>    - To write an element you need a configuration for the
>>>>>>    destination and the element itself, so a tuple of *(ElasticConfiguration,
>>>>>>    Element)* should be enough (that's basically your MessageWrapper<ElementT,
>>>>>>    ConfigurationT> class).
>>>>>>
>>>>>>
>>>>>>    -
>>>>>>    -
>>>>>>>
>>>>>>>    The actual use of the *DynamicElasticsearchSink* class (Would it
>>>>>>>    just be something like an *.addSink(**DynamicElasticSearchSink<**String,
>>>>>>>    Configuration>())* or perhaps something else entirely?)
>>>>>>
>>>>>>    -
>>>>>>
>>>>>> I guess it could look something like the snippet below. It would be
>>>>>> definitely good to play around with the *DynamicElasticSearchSink*
>>>>>> API and make it more meaningful / user friendly (the gist I've shared was
>>>>>> just a very rough prototype to showcase the idea).
>>>>>>
>>>>>>
>>>>>>    - static class Destination {
>>>>>>
>>>>>>        private final List<HttpHost> httpHosts;
>>>>>>
>>>>>>        Destination(List<HttpHost> httpHosts) {
>>>>>>            this.httpHosts = httpHosts;
>>>>>>        }
>>>>>>    }
>>>>>>    -
>>>>>>    - final DataStream<Tuple2<Destination, String>> toWrite = ...;
>>>>>>    toWrite.addSink(
>>>>>>            new DynamicElasticsearchSink<>(
>>>>>>                    new SinkRouter<
>>>>>>                            Tuple2<Destination, String>,
>>>>>>                            String,
>>>>>>                            ElasticsearchSink<Tuple2<Destination,
>>>>>>    String>>>() {
>>>>>>
>>>>>>                        @Override
>>>>>>                        public String getRoute(Tuple2<Destination,
>>>>>>    String> element) {
>>>>>>    -                         // Construct a deterministic unique
>>>>>>    caching key for the destination... (this could be cheaper if you know the
>>>>>>    data)
>>>>>>                            return element.f0.httpHosts.stream()
>>>>>>                                    .map(HttpHost::toHostString)
>>>>>>                                    .collect(Collectors.joining(","));
>>>>>>                        }
>>>>>>
>>>>>>                        @Override
>>>>>>                        public ElasticsearchSink<Tuple2<Destination,
>>>>>>    String>> createSink(
>>>>>>                                String cacheKey, Tuple2<Destination,
>>>>>>    String> element) {
>>>>>>                            return new ElasticsearchSink.Builder<>(
>>>>>>                                            element.f0.httpHosts,
>>>>>>
>>>>>>    (ElasticsearchSinkFunction<
>>>>>>
>>>>>>    Tuple2<Destination, String>>)
>>>>>>                                                    (el, ctx,
>>>>>>    indexer) -> {
>>>>>>                                                        // Construct
>>>>>>    index request.
>>>>>>                                                        final
>>>>>>    IndexRequest request = ...;
>>>>>>
>>>>>>    indexer.add(request);
>>>>>>                                                    })
>>>>>>                                    .build();
>>>>>>                        }
>>>>>>                    }));
>>>>>>
>>>>>>
>>>>>> I hope this helps ;)
>>>>>>
>>>>>> Best,
>>>>>> D.
>>>>>>
>>>>>>
>>>>>> On Mon, Aug 16, 2021 at 5:18 PM Rion Williams <ri...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Thanks for this suggestion David, it's extremely helpful.
>>>>>>>
>>>>>>> Since this will vary depending on the elements retrieved from a
>>>>>>> separate stream, I'm guessing something like the following would be
>>>>>>> roughly the avenue to continue down:
>>>>>>>
>>>>>>> fun main(args: Array<String>) {
>>>>>>>     val parameters = mergeParametersFromProperties(args)
>>>>>>>     val stream = StreamExecutionEnvironment.getExecutionEnvironment()
>>>>>>>
>>>>>>>     // Get the stream for tenant-specific Elastic configurations
>>>>>>>     val connectionStream = stream
>>>>>>>         .fromSource(
>>>>>>>             KafkaSource.of(parameters, listOf("elastic-configs")),
>>>>>>>             WatermarkStrategy.noWatermarks(),
>>>>>>>             "elastic-configs"
>>>>>>>         )
>>>>>>>
>>>>>>>     // Get the stream of incoming messages to be routed to Elastic
>>>>>>>     stream
>>>>>>>         .fromSource(
>>>>>>>             KafkaSource.of(parameters, listOf("messages")),
>>>>>>>             WatermarkStrategy.noWatermarks(),
>>>>>>>             "messages"
>>>>>>>         )
>>>>>>>         .keyBy { message ->
>>>>>>>             // Key by the tenant in the message
>>>>>>>             message.getTenant()
>>>>>>>         }
>>>>>>>         .connect(
>>>>>>>             // Connect the messages stream with the configurations
>>>>>>>             connectionStream
>>>>>>>         )
>>>>>>>         .process(object : KeyedCoProcessFunction<String, String, String, String>() {
>>>>>>>             // For this key, we need to store all of the previous messages in state
>>>>>>>             // in the case where we don't have a given mapping for this tenant yet
>>>>>>>             lateinit var messagesAwaitingConfigState: ListState<String>
>>>>>>>             lateinit var configState: ValueState<String>
>>>>>>>
>>>>>>>             override fun open(parameters: Configuration) {
>>>>>>>                 super.open(parameters)
>>>>>>>                 // Initialize the states
>>>>>>>                 messagesAwaitingConfigState = runtimeContext.getListState(awaitingStateDesc)
>>>>>>>                 configState = runtimeContext.getState(configStateDesc)
>>>>>>>             }
>>>>>>>
>>>>>>>             // When an element is received
>>>>>>>             override fun processElement1(message: String, context: Context, out: Collector<String>) {
>>>>>>>                 // Check if we have a mapping
>>>>>>>                 if (configState.value() == null){
>>>>>>>                     // We don't have a mapping for this tenant, store messages until we get it
>>>>>>>                     messagesAwaitingConfigState.add(message)
>>>>>>>                 }
>>>>>>>                 else {
>>>>>>>                     // Output the record with some indicator of the route?
>>>>>>>                     out.collect(message)
>>>>>>>                 }
>>>>>>>             }
>>>>>>>
>>>>>>>             override fun processElement2(config: String, context: Context, out: Collector<String>) {
>>>>>>>                 // If this mapping is for this specific tenant, store it and flush the pending
>>>>>>>                 // records in state
>>>>>>>                 if (config.getTenant() == context.currentKey){
>>>>>>>                     configState.update(config)
>>>>>>>                     val messagesToFlush = messagesAwaitingConfigState.get()
>>>>>>>                     messagesToFlush.forEach { message ->
>>>>>>>                         out.collect(message)
>>>>>>>                     }
>>>>>>>                 }
>>>>>>>             }
>>>>>>>
>>>>>>>             // State descriptors
>>>>>>>             val awaitingStateDesc = ListStateDescriptor(
>>>>>>>                 "messages-awaiting-config",
>>>>>>>                 TypeInformation.of(String::class.java)
>>>>>>>             )
>>>>>>>
>>>>>>>             val configStateDesc = ValueStateDescriptor(
>>>>>>>                 "elastic-config",
>>>>>>>                 TypeInformation.of(String::class.java)
>>>>>>>             )
>>>>>>>         })
>>>>>>>
>>>>>>>     stream.executeAsync("$APPLICATION_NAME-job")
>>>>>>> }
>>>>>>>
>>>>>>> Basically, connect my tenant-specific configuration stream with my
>>>>>>> incoming messages (keyed by tenant) and buffer them until I have a
>>>>>>> corresponding configuration (to avoid race-conditions). However, I'm
>>>>>>> guessing what will happen here is rather than directly outputting the
>>>>>>> messages from this process function, I'd construct some type of wrapper
>>>>>>> here with the necessary routing/configuration for the message (obtained via
>>>>>>> the configuration stream) along with the element, which might be something
>>>>>>> like a MessageWrapper<ElementT, ConfigurationT> and pass those
>>>>>>> elements to the sink, which would create the tenant-specific Elastic
>>>>>>> connection from the ConfigurationT element and handle caching it
>>>>>>> and then just grab the element and send it on it's way?
>>>>>>>
>>>>>>> Those are really the only bits I'm stuck on at the moment:
>>>>>>>
>>>>>>>    1. The shape of the elements being evicted from the process
>>>>>>>    function (Is a simple wrapper with the configuration for the sink enough
>>>>>>>    here? Do I need to explicitly initialize the sink within this function?
>>>>>>>    Etc.)
>>>>>>>    2. The actual use of the DynamicElasticsearchSink class (Would
>>>>>>>    it just be something like an .addSink(DynamicElasticSearchSink<String,
>>>>>>>    Configuration>()) or perhaps something else entirely?)
>>>>>>>
>>>>>>> Thanks again so much for the advice thus far David, it's greatly
>>>>>>> appreciated.
>>>>>>>
>>>>>>> Rion
>>>>>>>
>>>>>>> On Fri, Aug 13, 2021 at 9:04 AM David Morávek <dm...@apache.org>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> To give you a better idea, in high-level I think could look
>>>>>>>> something like this
>>>>>>>> <https://gist.github.com/dmvk/3f8124d585cd33e52cacd4a38b80f8c8>
>>>>>>>> [1].
>>>>>>>>
>>>>>>>> [1] https://gist.github.com/dmvk/3f8124d585cd33e52cacd4a38b80f8c8
>>>>>>>>
>>>>>>>> On Fri, Aug 13, 2021 at 2:57 PM Rion Williams <
>>>>>>>> rionmonster@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi David,
>>>>>>>>>
>>>>>>>>> Thanks for your response! I think there are currently quite a few
>>>>>>>>> unknowns in my end in terms of what a production loads look like but I
>>>>>>>>> think the number of clusters shouldn’t be too large (and will either rarely
>>>>>>>>> change or have new entries come in at runtime, but it needs to support
>>>>>>>>> that).
>>>>>>>>>
>>>>>>>>> I think the dynamic approach might be a good route to explore with
>>>>>>>>> actual changes to the Elasticsearch sink as a longer term option. I’m not
>>>>>>>>> sure what the dynamic one would look like at the moment though, perhaps
>>>>>>>>> that’s something you’d be able to advise on?
>>>>>>>>>
>>>>>>>>> Given that all the records are keyed for a given tenant and I
>>>>>>>>> would have the mappings stored in state, is it possible that within the
>>>>>>>>> open() function for this dynamic route to access the state and initialize
>>>>>>>>> the client there? Or maybe there’s some other approach (such as grouping by
>>>>>>>>> clusters and dynamically handling indices)?
>>>>>>>>>
>>>>>>>>> I’d be happy to give a shot at making the appropriate changes to
>>>>>>>>> the sink as well, although I’m far from an Elastic expert. If you point me
>>>>>>>>> in the right direction, I may be able to help out.
>>>>>>>>>
>>>>>>>>> Thanks much!
>>>>>>>>>
>>>>>>>>> Rion
>>>>>>>>>
>>>>>>>>> On Aug 13, 2021, at 6:52 AM, David Morávek <dm...@apache.org>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>> 
>>>>>>>>> Hi Rion,
>>>>>>>>>
>>>>>>>>> As you probably already know, for dynamic indices, you can simply
>>>>>>>>> implement your own ElasticsearchSinkFunction
>>>>>>>>> <https://github.com/apache/flink/blob/release-1.13.2/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkFunction.java>
>>>>>>>>> [1], where you can create any request that elastic client supports.
>>>>>>>>>
>>>>>>>>> The tricky part is how to implement dynamic routing into multiple
>>>>>>>>> clusters.
>>>>>>>>> - If the elastic clusters are known upfront (before submitting
>>>>>>>>> job), you can easily create multiple elastic sinks and prepend them with a
>>>>>>>>> simple filter (this is basically what split operator does).
>>>>>>>>> - If you discover elastics clusters at runtime, this would require
>>>>>>>>> some changes of the current ElasticsearchSink implementation. I think this
>>>>>>>>> may be actually as simple as introducing something like
>>>>>>>>> DynamicElasticsearchSink, that could dynamically create and managed "child"
>>>>>>>>> sinks. This would probably require some thoughts about how to manage
>>>>>>>>> consumed resources (memory), because number of child sink could be
>>>>>>>>> potentially unbounded. This could be of course simplified if underlying
>>>>>>>>> elastic client already supports that, which I'm not aware of. If you'd like
>>>>>>>>> to take this path, it would definitely be a great contribution to Flink
>>>>>>>>> (I'm able to provide some guidance).
>>>>>>>>>
>>>>>>>>> [1]
>>>>>>>>> https://github.com/apache/flink/blob/release-1.13.2/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkFunction.java
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> D.
>>>>>>>>>
>>>>>>>>> On Sun, Aug 8, 2021 at 4:24 PM Rion Williams <
>>>>>>>>> rionmonster@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi folks,
>>>>>>>>>>
>>>>>>>>>> I have a use-case that I wanted to initially pose to the mailing
>>>>>>>>>> list as I’m not terribly familiar with the Elasticsearch connector to
>>>>>>>>>> ensure I’m not going down the wrong path trying to accomplish this in Flink
>>>>>>>>>> (or if something downstream might be a better option).
>>>>>>>>>>
>>>>>>>>>> Basically, I have the following pieces to the puzzle:
>>>>>>>>>>
>>>>>>>>>>    - A stream of tenant-specific events
>>>>>>>>>>    - An HTTP endpoint containing mappings for tenant-specific
>>>>>>>>>>    Elastic cluster information (as each tenant has its own specific Elastic
>>>>>>>>>>    cluster/index)
>>>>>>>>>>
>>>>>>>>>> What I’m hoping to accomplish is the following:
>>>>>>>>>>
>>>>>>>>>>    1. One stream will periodically poll the HTTP endpoint and
>>>>>>>>>>    store these cluster mappings in state (keyed by tenant with cluster info as
>>>>>>>>>>    the value)
>>>>>>>>>>    2. The event stream will be keyed by tenant and connected to
>>>>>>>>>>    the cluster mappings stream.
>>>>>>>>>>    3. I’ll need to an Elasticsearch sink that can route the
>>>>>>>>>>    tenant-specific event data to its corresponding cluster/index from the
>>>>>>>>>>    mapping source.
>>>>>>>>>>
>>>>>>>>>> I know that the existing Elasticsearch sink supports dynamic
>>>>>>>>>> indices, however I didn’t know if it’s possible to adjust the cluster like
>>>>>>>>>> I would need on a per-tenant basis or if there’s a better approach here?
>>>>>>>>>>
>>>>>>>>>> Any advice would be appreciated.
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>>
>>>>>>>>>> Rion
>>>>>>>>>>
>>>>>>>>>

Re: Dynamic Cluster/Index Routing for Elasticsearch Sink

Posted by Rion Williams <ri...@gmail.com>.
Thanks again David,

I've spun up a JIRA issue for the ticket
<https://issues.apache.org/jira/browse/FLINK-23977> while I work on getting
things into the proper state. If someone with the
appropriate privileges could assign it to me, I'd be appreciative. I'll
likely need some assistance at a few points to ensure things look as
expected, but I'm happy to help with this contribution.

Rion

On Wed, Aug 25, 2021 at 11:37 AM David Morávek <dm...@apache.org> wrote:

> AFAIK there are currently no other sources in Flink that can treat "other
> sources" / "destination" as data. Most complete generic work on this topic
> that I'm aware of are Splittable DoFn based IOs in Apache Beam.
>
> I think the best module for the contribution would be
> "elasticsearch-base", because this could be easily reused for all ES
> versions that we currently support.
>
> Best,
> D.
>
> On Wed, Aug 25, 2021 at 4:58 PM Rion Williams <ri...@gmail.com>
> wrote:
>
>> Hi David,
>>
>> That was perfect and it looks like this is working as I'd expected. I put
>> together some larger integration tests for my specific use-case (multiple
>> Elasticsearch clusters running in TestContainers) and verified that
>> messages were being routed dynamically to the appropriate sinks. I forked
>> the Flink repo last night and was trying to figure out the best place to
>> start adding these classes in (I noticed that there were three separate ES
>> packages targeting 5/6/7 respectively). I was going to try to start
>> fleshing the initial implementation for this, but wanted to make sure that
>> I was starting in the right place.
>>
>> Additionally, do you know of anything that might be similar to this even
>> within other sinks? Just trying to think of something to model this after.
>> Once I get things started, I'll spin up a JIRA issue for it and go from
>> there.
>>
>> Thanks so much for your help!
>>
>> Rion
>>
>> On Tue, Aug 24, 2021 at 1:45 AM David Morávek <dm...@apache.org> wrote:
>>
>>> Hi Rion,
>>>
>>> you just need to call *sink.setRuntimeContext(getRuntimeContext())*
>>> before opening the child sink. Please see *AbstractRichFunction* [1]
>>> (that EleasticsearchSink extends) for more details.
>>>
>>> One more note, instead of starting with integration test, I'd recommend
>>> writing a unit test using *operator test harness* [2] first. This
>>> should help you to discover / debug many issues upfront. You can use
>>> *ElasticsearchSinkBaseTest* [3] as an example.
>>>
>>> [1]
>>> https://github.com/apache/flink/blob/release-1.13.2/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractRichFunction.java#L52
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/testing/#unit-testing-stateful-or-timely-udfs--custom-operators
>>> [3]
>>> https://github.com/apache/flink/blob/release-1.13.2/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java
>>>
>>> Best,
>>> D.
>>>
>>> On Tue, Aug 24, 2021 at 12:03 AM Rion Williams <ri...@gmail.com>
>>> wrote:
>>>
>>>> Hi David,
>>>>
>>>> Thanks again for the response, I believe that I'm getting pretty close
>>>> for at least a POC-level implementation of this. Currently, I'm working
>>>> with JsonObject instances throughout the pipeline, so I wanted to try this
>>>> out and simply stored the routing information within the element itself for
>>>> simplicity's sake right now, so it has a shape that looks something like
>>>> this:
>>>>
>>>> {
>>>>     "route": {
>>>>         "hosts": "...",
>>>>         "index": "...",
>>>>         ...
>>>>     },
>>>>     "all-other-fields-here"
>>>> }
>>>>
>>>> And I've stripped back several of the layers of the routers (since I
>>>> already have all of the information in the element at that point). I tried
>>>> using something like this:
>>>>
>>>> class DynamicElasticsearchSink: RichSinkFunction<JsonObject>(), CheckpointedFunction {
>>>>     private val sinkRoutes: MutableMap<String, ElasticsearchSink<JsonObject>> = ConcurrentHashMap()
>>>>     private lateinit var configuration: Configuration
>>>>
>>>>     override fun open(parameters: Configuration) {
>>>>         configuration = parameters
>>>>     }
>>>>
>>>>     override fun invoke(element: JsonObject, context: SinkFunction.Context) {
>>>>         val route = getHost(element)
>>>>         // Check if we already have a router for this cluster
>>>>         var sink = sinkRoutes[route]
>>>>         if (sink == null) {
>>>>             // If not, create one
>>>>             sink = buildSinkFromRoute(element)
>>>>             sink.open(configuration)
>>>>             sinkRoutes[route] = sink
>>>>         }
>>>>
>>>>         sink.invoke(element, context)
>>>>     }
>>>>
>>>>     override fun initializeState(context: FunctionInitializationContext) {
>>>>         // No-op.
>>>>     }
>>>>
>>>>     override fun snapshotState(context: FunctionSnapshotContext) {
>>>>         // This is used only to flush pending writes.
>>>>         for (sink in sinkRoutes.values) {
>>>>             sink.snapshotState(context)
>>>>         }
>>>>     }
>>>>
>>>>     override fun close() {
>>>>         for (sink in sinkRoutes.values) {
>>>>             sink.close()
>>>>         }
>>>>     }
>>>>
>>>>     private fun buildSinkFromRoute(element: JsonObject, ho): ElasticsearchSink<JsonObject> {
>>>>         val builder = ElasticsearchSink.Builder<JsonObject>(
>>>>             buildHostsFromElement(element),
>>>>             ElasticsearchRoutingFunction()
>>>>         )
>>>>
>>>>         builder.setBulkFlushMaxActions(1)
>>>>
>>>>         // TODO: Configure authorization if available
>>>> //        builder.setRestClientFactory { restClient ->
>>>> //            restClient.setHttpClientConfigCallback(object : RestClientBuilder.HttpClientConfigCallback {
>>>> //                override fun customizeHttpClient(builder: HttpAsyncClientBuilder): HttpAsyncClientBuilder {
>>>> //                    // Configure authorization here
>>>> //                    val credentialsProvider = BasicCredentialsProvider().apply {
>>>> //                        setCredentials(
>>>> //                            AuthScope.ANY,
>>>> //                            UsernamePasswordCredentials("$USERNAME", "$PASSWORD")
>>>> //                        )
>>>> //                    }
>>>> //
>>>> //                    return builder.setDefaultCredentialsProvider(credentialsProvider);
>>>> //                }
>>>> //            })
>>>> //        }
>>>>
>>>>         return builder.build()
>>>>     }
>>>>
>>>>     private fun buildHostsFromElement(element: JsonObject): List<HttpHost>{
>>>>         val transportAddresses = element
>>>>             .get("route").asJsonObject
>>>>             .get("hosts").asString
>>>>
>>>>         // If there are multiple, they should be comma-delimited
>>>>         val addresses = transportAddresses.split(",")
>>>>         return addresses
>>>>             .filter { address -> address.isNotEmpty() }
>>>>             .map { address ->
>>>>                 HttpHost.create(address)
>>>>             }
>>>>     }
>>>>
>>>>     private fun getHost(element: JsonObject): String {
>>>>         return element
>>>>             .get("route").asJsonObject
>>>>             .get("hosts").asString
>>>>     }
>>>>
>>>>     private class ElasticsearchRoutingFunction: ElasticsearchSinkFunction<JsonObject> {
>>>>         override fun process(element: JsonObject, context: RuntimeContext, indexer: RequestIndexer) {
>>>>             indexer.add(request(element))
>>>>         }
>>>>
>>>>         private fun request(element: JsonObject): IndexRequest {
>>>>             // Access routing information
>>>>             val index = element
>>>>                 .get("route").asJsonObject
>>>>                 .get("index").asString
>>>>
>>>>             // Strip off routing information
>>>>             element.remove("route")
>>>>
>>>>             // Send the request
>>>>             return Requests.indexRequest()
>>>>                 .index(index)
>>>>                 .type("_doc")
>>>>                 .source(mapOf(
>>>>                     "data" to "$element"
>>>>                 ))
>>>>         }
>>>>     }
>>>> }
>>>>
>>>> After running an integration test, I keep encountering running into the
>>>> following error during the invocation of the child sink:
>>>>
>>>> // The runtime context has not been initialized.
>>>> sink.invoke(element, context)
>>>>
>>>> I can see the underlying sink getting initialized, the open call being
>>>> made, etc. however for some reason it looks like there's an issue related
>>>> to the context during the invoke call namely* "The runtime context has
>>>> not been initialized". *I had assumed this would be alright since the
>>>> context for the "wrapper" should have already been initialized, but maybe
>>>> there's something that I'm missing.
>>>>
>>>> Also, please forgive any hastily written or nasty code as this is
>>>> purely a POC to see if I could get this to work using a single object. I
>>>> have the hopes of cleaning it up and genericizing it after I am confident
>>>> that it actually works.
>>>>
>>>> Thanks so much again,
>>>>
>>>> Rion
>>>>
>>>> On Mon, Aug 23, 2021 at 11:12 AM David Morávek <dm...@apache.org> wrote:
>>>>
>>>>> Hi Rion,
>>>>>
>>>>> Sorry for late reply, I've missed your previous message. Thanks Arvid
>>>>> for the reminder <3.
>>>>>
>>>>> something like a MessageWrapper<ElementT, ConfigurationT> and pass
>>>>>> those elements to the sink, which would create the tenant-specific Elastic
>>>>>> connection from the ConfigurationT element and handle caching it and
>>>>>> then just grab the element and send it on it's way?
>>>>>
>>>>>
>>>>> Yes, this is exactly what I had in mind. There should be almost no
>>>>> overhead as sink can be easily chained with your join
>>>>> (KeyedCoProcessFunction) function.
>>>>>
>>>>>    -
>>>>>    -
>>>>>>
>>>>>>    The shape of the elements being evicted from the process function
>>>>>>    (Is a simple wrapper with the configuration for the sink enough here? Do I
>>>>>>    need to explicitly initialize the sink within this function? Etc.)
>>>>>
>>>>>    -
>>>>>    - To write an element you need a configuration for the destination
>>>>>    and the element itself, so a tuple of *(ElasticConfiguration,
>>>>>    Element)* should be enough (that's basically your MessageWrapper<ElementT,
>>>>>    ConfigurationT> class).
>>>>>
>>>>>
>>>>>    -
>>>>>    -
>>>>>>
>>>>>>    The actual use of the *DynamicElasticsearchSink* class (Would it
>>>>>>    just be something like an *.addSink(**DynamicElasticSearchSink<**String,
>>>>>>    Configuration>())* or perhaps something else entirely?)
>>>>>
>>>>>    -
>>>>>
>>>>> I guess it could look something like the snippet below. It would be
>>>>> definitely good to play around with the *DynamicElasticSearchSink*
>>>>> API and make it more meaningful / user friendly (the gist I've shared was
>>>>> just a very rough prototype to showcase the idea).
>>>>>
>>>>>
>>>>>    - static class Destination {
>>>>>
>>>>>        private final List<HttpHost> httpHosts;
>>>>>
>>>>>        Destination(List<HttpHost> httpHosts) {
>>>>>            this.httpHosts = httpHosts;
>>>>>        }
>>>>>    }
>>>>>    -
>>>>>    - final DataStream<Tuple2<Destination, String>> toWrite = ...;
>>>>>    toWrite.addSink(
>>>>>            new DynamicElasticsearchSink<>(
>>>>>                    new SinkRouter<
>>>>>                            Tuple2<Destination, String>,
>>>>>                            String,
>>>>>                            ElasticsearchSink<Tuple2<Destination,
>>>>>    String>>>() {
>>>>>
>>>>>                        @Override
>>>>>                        public String getRoute(Tuple2<Destination,
>>>>>    String> element) {
>>>>>    -                         // Construct a deterministic unique
>>>>>    caching key for the destination... (this could be cheaper if you know the
>>>>>    data)
>>>>>                            return element.f0.httpHosts.stream()
>>>>>                                    .map(HttpHost::toHostString)
>>>>>                                    .collect(Collectors.joining(","));
>>>>>                        }
>>>>>
>>>>>                        @Override
>>>>>                        public ElasticsearchSink<Tuple2<Destination,
>>>>>    String>> createSink(
>>>>>                                String cacheKey, Tuple2<Destination,
>>>>>    String> element) {
>>>>>                            return new ElasticsearchSink.Builder<>(
>>>>>                                            element.f0.httpHosts,
>>>>>                                            (ElasticsearchSinkFunction<
>>>>>
>>>>>    Tuple2<Destination, String>>)
>>>>>                                                    (el, ctx, indexer)
>>>>>    -> {
>>>>>                                                        // Construct
>>>>>    index request.
>>>>>                                                        final
>>>>>    IndexRequest request = ...;
>>>>>
>>>>>    indexer.add(request);
>>>>>                                                    })
>>>>>                                    .build();
>>>>>                        }
>>>>>                    }));
>>>>>
>>>>>
>>>>> I hope this helps ;)
>>>>>
>>>>> Best,
>>>>> D.
>>>>>
>>>>>
>>>>> On Mon, Aug 16, 2021 at 5:18 PM Rion Williams <ri...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Thanks for this suggestion David, it's extremely helpful.
>>>>>>
>>>>>> Since this will vary depending on the elements retrieved from a
>>>>>> separate stream, I'm guessing something like the following would be
>>>>>> roughly the avenue to continue down:
>>>>>>
>>>>>> fun main(args: Array<String>) {
>>>>>>     val parameters = mergeParametersFromProperties(args)
>>>>>>     val stream = StreamExecutionEnvironment.getExecutionEnvironment()
>>>>>>
>>>>>>     // Get the stream for tenant-specific Elastic configurations
>>>>>>     val connectionStream = stream
>>>>>>         .fromSource(
>>>>>>             KafkaSource.of(parameters, listOf("elastic-configs")),
>>>>>>             WatermarkStrategy.noWatermarks(),
>>>>>>             "elastic-configs"
>>>>>>         )
>>>>>>
>>>>>>     // Get the stream of incoming messages to be routed to Elastic
>>>>>>     stream
>>>>>>         .fromSource(
>>>>>>             KafkaSource.of(parameters, listOf("messages")),
>>>>>>             WatermarkStrategy.noWatermarks(),
>>>>>>             "messages"
>>>>>>         )
>>>>>>         .keyBy { message ->
>>>>>>             // Key by the tenant in the message
>>>>>>             message.getTenant()
>>>>>>         }
>>>>>>         .connect(
>>>>>>             // Connect the messages stream with the configurations
>>>>>>             connectionStream
>>>>>>         )
>>>>>>         .process(object : KeyedCoProcessFunction<String, String, String, String>() {
>>>>>>             // For this key, we need to store all of the previous messages in state
>>>>>>             // in the case where we don't have a given mapping for this tenant yet
>>>>>>             lateinit var messagesAwaitingConfigState: ListState<String>
>>>>>>             lateinit var configState: ValueState<String>
>>>>>>
>>>>>>             override fun open(parameters: Configuration) {
>>>>>>                 super.open(parameters)
>>>>>>                 // Initialize the states
>>>>>>                 messagesAwaitingConfigState = runtimeContext.getListState(awaitingStateDesc)
>>>>>>                 configState = runtimeContext.getState(configStateDesc)
>>>>>>             }
>>>>>>
>>>>>>             // When an element is received
>>>>>>             override fun processElement1(message: String, context: Context, out: Collector<String>) {
>>>>>>                 // Check if we have a mapping
>>>>>>                 if (configState.value() == null){
>>>>>>                     // We don't have a mapping for this tenant, store messages until we get it
>>>>>>                     messagesAwaitingConfigState.add(message)
>>>>>>                 }
>>>>>>                 else {
>>>>>>                     // Output the record with some indicator of the route?
>>>>>>                     out.collect(message)
>>>>>>                 }
>>>>>>             }
>>>>>>
>>>>>>             override fun processElement2(config: String, context: Context, out: Collector<String>) {
>>>>>>                 // If this mapping is for this specific tenant, store it and flush the pending
>>>>>>                 // records in state
>>>>>>                 if (config.getTenant() == context.currentKey){
>>>>>>                     configState.update(config)
>>>>>>                     val messagesToFlush = messagesAwaitingConfigState.get()
>>>>>>                     messagesToFlush.forEach { message ->
>>>>>>                         out.collect(message)
>>>>>>                     }
>>>>>>                 }
>>>>>>             }
>>>>>>
>>>>>>             // State descriptors
>>>>>>             val awaitingStateDesc = ListStateDescriptor(
>>>>>>                 "messages-awaiting-config",
>>>>>>                 TypeInformation.of(String::class.java)
>>>>>>             )
>>>>>>
>>>>>>             val configStateDesc = ValueStateDescriptor(
>>>>>>                 "elastic-config",
>>>>>>                 TypeInformation.of(String::class.java)
>>>>>>             )
>>>>>>         })
>>>>>>
>>>>>>     stream.executeAsync("$APPLICATION_NAME-job")
>>>>>> }
>>>>>>
>>>>>> Basically, connect my tenant-specific configuration stream with my
>>>>>> incoming messages (keyed by tenant) and buffer them until I have a
>>>>>> corresponding configuration (to avoid race-conditions). However, I'm
>>>>>> guessing what will happen here is rather than directly outputting the
>>>>>> messages from this process function, I'd construct some type of wrapper
>>>>>> here with the necessary routing/configuration for the message (obtained via
>>>>>> the configuration stream) along with the element, which might be something
>>>>>> like a MessageWrapper<ElementT, ConfigurationT> and pass those
>>>>>> elements to the sink, which would create the tenant-specific Elastic
>>>>>> connection from the ConfigurationT element and handle caching it and
>>>>>> then just grab the element and send it on it's way?
>>>>>>
>>>>>> Those are really the only bits I'm stuck on at the moment:
>>>>>>
>>>>>>    1. The shape of the elements being evicted from the process
>>>>>>    function (Is a simple wrapper with the configuration for the sink enough
>>>>>>    here? Do I need to explicitly initialize the sink within this function?
>>>>>>    Etc.)
>>>>>>    2. The actual use of the DynamicElasticsearchSink class (Would it
>>>>>>    just be something like an .addSink(DynamicElasticSearchSink<String,
>>>>>>    Configuration>()) or perhaps something else entirely?)
>>>>>>
>>>>>> Thanks again so much for the advice thus far David, it's greatly
>>>>>> appreciated.
>>>>>>
>>>>>> Rion
>>>>>>
>>>>>> On Fri, Aug 13, 2021 at 9:04 AM David Morávek <dm...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>> To give you a better idea, in high-level I think could look
>>>>>>> something like this
>>>>>>> <https://gist.github.com/dmvk/3f8124d585cd33e52cacd4a38b80f8c8> [1].
>>>>>>>
>>>>>>> [1] https://gist.github.com/dmvk/3f8124d585cd33e52cacd4a38b80f8c8
>>>>>>>
>>>>>>> On Fri, Aug 13, 2021 at 2:57 PM Rion Williams <ri...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi David,
>>>>>>>>
>>>>>>>> Thanks for your response! I think there are currently quite a few
>>>>>>>> unknowns in my end in terms of what a production loads look like but I
>>>>>>>> think the number of clusters shouldn’t be too large (and will either rarely
>>>>>>>> change or have new entries come in at runtime, but it needs to support
>>>>>>>> that).
>>>>>>>>
>>>>>>>> I think the dynamic approach might be a good route to explore with
>>>>>>>> actual changes to the Elasticsearch sink as a longer term option. I’m not
>>>>>>>> sure what the dynamic one would look like at the moment though, perhaps
>>>>>>>> that’s something you’d be able to advise on?
>>>>>>>>
>>>>>>>> Given that all the records are keyed for a given tenant and I would
>>>>>>>> have the mappings stored in state, is it possible that within the open()
>>>>>>>> function for this dynamic route to access the state and initialize the
>>>>>>>> client there? Or maybe there’s some other approach (such as grouping by
>>>>>>>> clusters and dynamically handling indices)?
>>>>>>>>
>>>>>>>> I’d be happy to give a shot at making the appropriate changes to
>>>>>>>> the sink as well, although I’m far from an Elastic expert. If you point me
>>>>>>>> in the right direction, I may be able to help out.
>>>>>>>>
>>>>>>>> Thanks much!
>>>>>>>>
>>>>>>>> Rion
>>>>>>>>
>>>>>>>> On Aug 13, 2021, at 6:52 AM, David Morávek <dm...@apache.org> wrote:
>>>>>>>>
>>>>>>>> 
>>>>>>>> Hi Rion,
>>>>>>>>
>>>>>>>> As you probably already know, for dynamic indices, you can simply
>>>>>>>> implement your own ElasticsearchSinkFunction
>>>>>>>> <https://github.com/apache/flink/blob/release-1.13.2/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkFunction.java>
>>>>>>>> [1], where you can create any request that elastic client supports.
>>>>>>>>
>>>>>>>> The tricky part is how to implement dynamic routing into multiple
>>>>>>>> clusters.
>>>>>>>> - If the elastic clusters are known upfront (before submitting
>>>>>>>> job), you can easily create multiple elastic sinks and prepend them with a
>>>>>>>> simple filter (this is basically what split operator does).
>>>>>>>> - If you discover elastics clusters at runtime, this would require
>>>>>>>> some changes of the current ElasticsearchSink implementation. I think this
>>>>>>>> may be actually as simple as introducing something like
>>>>>>>> DynamicElasticsearchSink, that could dynamically create and managed "child"
>>>>>>>> sinks. This would probably require some thoughts about how to manage
>>>>>>>> consumed resources (memory), because number of child sink could be
>>>>>>>> potentially unbounded. This could be of course simplified if underlying
>>>>>>>> elastic client already supports that, which I'm not aware of. If you'd like
>>>>>>>> to take this path, it would definitely be a great contribution to Flink
>>>>>>>> (I'm able to provide some guidance).
>>>>>>>>
>>>>>>>> [1]
>>>>>>>> https://github.com/apache/flink/blob/release-1.13.2/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkFunction.java
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> D.
>>>>>>>>
>>>>>>>> On Sun, Aug 8, 2021 at 4:24 PM Rion Williams <ri...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi folks,
>>>>>>>>>
>>>>>>>>> I have a use-case that I wanted to initially pose to the mailing
>>>>>>>>> list as I’m not terribly familiar with the Elasticsearch connector to
>>>>>>>>> ensure I’m not going down the wrong path trying to accomplish this in Flink
>>>>>>>>> (or if something downstream might be a better option).
>>>>>>>>>
>>>>>>>>> Basically, I have the following pieces to the puzzle:
>>>>>>>>>
>>>>>>>>>    - A stream of tenant-specific events
>>>>>>>>>    - An HTTP endpoint containing mappings for tenant-specific
>>>>>>>>>    Elastic cluster information (as each tenant has its own specific Elastic
>>>>>>>>>    cluster/index)
>>>>>>>>>
>>>>>>>>> What I’m hoping to accomplish is the following:
>>>>>>>>>
>>>>>>>>>    1. One stream will periodically poll the HTTP endpoint and
>>>>>>>>>    store these cluster mappings in state (keyed by tenant with cluster info as
>>>>>>>>>    the value)
>>>>>>>>>    2. The event stream will be keyed by tenant and connected to
>>>>>>>>>    the cluster mappings stream.
>>>>>>>>>    3. I’ll need to an Elasticsearch sink that can route the
>>>>>>>>>    tenant-specific event data to its corresponding cluster/index from the
>>>>>>>>>    mapping source.
>>>>>>>>>
>>>>>>>>> I know that the existing Elasticsearch sink supports dynamic
>>>>>>>>> indices, however I didn’t know if it’s possible to adjust the cluster like
>>>>>>>>> I would need on a per-tenant basis or if there’s a better approach here?
>>>>>>>>>
>>>>>>>>> Any advice would be appreciated.
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>>
>>>>>>>>> Rion
>>>>>>>>>
>>>>>>>>

Re: Dynamic Cluster/Index Routing for Elasticsearch Sink

Posted by David Morávek <dm...@apache.org>.
AFAIK there are currently no other sources in Flink that can treat "other
sources" / "destination" as data. Most complete generic work on this topic
that I'm aware of are Splittable DoFn based IOs in Apache Beam.

I think the best module for the contribution would be "elasticsearch-base",
because this could be easily reused for all ES versions that we currently
support.

Best,
D.

On Wed, Aug 25, 2021 at 4:58 PM Rion Williams <ri...@gmail.com> wrote:

> Hi David,
>
> That was perfect and it looks like this is working as I'd expected. I put
> together some larger integration tests for my specific use-case (multiple
> Elasticsearch clusters running in TestContainers) and verified that
> messages were being routed dynamically to the appropriate sinks. I forked
> the Flink repo last night and was trying to figure out the best place to
> start adding these classes in (I noticed that there were three separate ES
> packages targeting 5/6/7 respectively). I was going to try to start
> fleshing the initial implementation for this, but wanted to make sure that
> I was starting in the right place.
>
> Additionally, do you know of anything that might be similar to this even
> within other sinks? Just trying to think of something to model this after.
> Once I get things started, I'll spin up a JIRA issue for it and go from
> there.
>
> Thanks so much for your help!
>
> Rion
>
> On Tue, Aug 24, 2021 at 1:45 AM David Morávek <dm...@apache.org> wrote:
>
>> Hi Rion,
>>
>> you just need to call *sink.setRuntimeContext(getRuntimeContext())*
>> before opening the child sink. Please see *AbstractRichFunction* [1]
>> (that EleasticsearchSink extends) for more details.
>>
>> One more note, instead of starting with integration test, I'd recommend
>> writing a unit test using *operator test harness* [2] first. This should
>> help you to discover / debug many issues upfront. You can use
>> *ElasticsearchSinkBaseTest* [3] as an example.
>>
>> [1]
>> https://github.com/apache/flink/blob/release-1.13.2/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractRichFunction.java#L52
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/testing/#unit-testing-stateful-or-timely-udfs--custom-operators
>> [3]
>> https://github.com/apache/flink/blob/release-1.13.2/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java
>>
>> Best,
>> D.
>>
>> On Tue, Aug 24, 2021 at 12:03 AM Rion Williams <ri...@gmail.com>
>> wrote:
>>
>>> Hi David,
>>>
>>> Thanks again for the response, I believe that I'm getting pretty close
>>> for at least a POC-level implementation of this. Currently, I'm working
>>> with JsonObject instances throughout the pipeline, so I wanted to try this
>>> out and simply stored the routing information within the element itself for
>>> simplicity's sake right now, so it has a shape that looks something like
>>> this:
>>>
>>> {
>>>     "route": {
>>>         "hosts": "...",
>>>         "index": "...",
>>>         ...
>>>     },
>>>     "all-other-fields-here"
>>> }
>>>
>>> And I've stripped back several of the layers of the routers (since I
>>> already have all of the information in the element at that point). I tried
>>> using something like this:
>>>
>>> class DynamicElasticsearchSink: RichSinkFunction<JsonObject>(), CheckpointedFunction {
>>>     private val sinkRoutes: MutableMap<String, ElasticsearchSink<JsonObject>> = ConcurrentHashMap()
>>>     private lateinit var configuration: Configuration
>>>
>>>     override fun open(parameters: Configuration) {
>>>         configuration = parameters
>>>     }
>>>
>>>     override fun invoke(element: JsonObject, context: SinkFunction.Context) {
>>>         val route = getHost(element)
>>>         // Check if we already have a router for this cluster
>>>         var sink = sinkRoutes[route]
>>>         if (sink == null) {
>>>             // If not, create one
>>>             sink = buildSinkFromRoute(element)
>>>             sink.open(configuration)
>>>             sinkRoutes[route] = sink
>>>         }
>>>
>>>         sink.invoke(element, context)
>>>     }
>>>
>>>     override fun initializeState(context: FunctionInitializationContext) {
>>>         // No-op.
>>>     }
>>>
>>>     override fun snapshotState(context: FunctionSnapshotContext) {
>>>         // This is used only to flush pending writes.
>>>         for (sink in sinkRoutes.values) {
>>>             sink.snapshotState(context)
>>>         }
>>>     }
>>>
>>>     override fun close() {
>>>         for (sink in sinkRoutes.values) {
>>>             sink.close()
>>>         }
>>>     }
>>>
>>>     private fun buildSinkFromRoute(element: JsonObject, ho): ElasticsearchSink<JsonObject> {
>>>         val builder = ElasticsearchSink.Builder<JsonObject>(
>>>             buildHostsFromElement(element),
>>>             ElasticsearchRoutingFunction()
>>>         )
>>>
>>>         builder.setBulkFlushMaxActions(1)
>>>
>>>         // TODO: Configure authorization if available
>>> //        builder.setRestClientFactory { restClient ->
>>> //            restClient.setHttpClientConfigCallback(object : RestClientBuilder.HttpClientConfigCallback {
>>> //                override fun customizeHttpClient(builder: HttpAsyncClientBuilder): HttpAsyncClientBuilder {
>>> //                    // Configure authorization here
>>> //                    val credentialsProvider = BasicCredentialsProvider().apply {
>>> //                        setCredentials(
>>> //                            AuthScope.ANY,
>>> //                            UsernamePasswordCredentials("$USERNAME", "$PASSWORD")
>>> //                        )
>>> //                    }
>>> //
>>> //                    return builder.setDefaultCredentialsProvider(credentialsProvider);
>>> //                }
>>> //            })
>>> //        }
>>>
>>>         return builder.build()
>>>     }
>>>
>>>     private fun buildHostsFromElement(element: JsonObject): List<HttpHost>{
>>>         val transportAddresses = element
>>>             .get("route").asJsonObject
>>>             .get("hosts").asString
>>>
>>>         // If there are multiple, they should be comma-delimited
>>>         val addresses = transportAddresses.split(",")
>>>         return addresses
>>>             .filter { address -> address.isNotEmpty() }
>>>             .map { address ->
>>>                 HttpHost.create(address)
>>>             }
>>>     }
>>>
>>>     private fun getHost(element: JsonObject): String {
>>>         return element
>>>             .get("route").asJsonObject
>>>             .get("hosts").asString
>>>     }
>>>
>>>     private class ElasticsearchRoutingFunction: ElasticsearchSinkFunction<JsonObject> {
>>>         override fun process(element: JsonObject, context: RuntimeContext, indexer: RequestIndexer) {
>>>             indexer.add(request(element))
>>>         }
>>>
>>>         private fun request(element: JsonObject): IndexRequest {
>>>             // Access routing information
>>>             val index = element
>>>                 .get("route").asJsonObject
>>>                 .get("index").asString
>>>
>>>             // Strip off routing information
>>>             element.remove("route")
>>>
>>>             // Send the request
>>>             return Requests.indexRequest()
>>>                 .index(index)
>>>                 .type("_doc")
>>>                 .source(mapOf(
>>>                     "data" to "$element"
>>>                 ))
>>>         }
>>>     }
>>> }
>>>
>>> After running an integration test, I keep encountering running into the
>>> following error during the invocation of the child sink:
>>>
>>> // The runtime context has not been initialized.
>>> sink.invoke(element, context)
>>>
>>> I can see the underlying sink getting initialized, the open call being
>>> made, etc. however for some reason it looks like there's an issue related
>>> to the context during the invoke call namely* "The runtime context has
>>> not been initialized". *I had assumed this would be alright since the
>>> context for the "wrapper" should have already been initialized, but maybe
>>> there's something that I'm missing.
>>>
>>> Also, please forgive any hastily written or nasty code as this is purely
>>> a POC to see if I could get this to work using a single object. I have the
>>> hopes of cleaning it up and genericizing it after I am confident that it
>>> actually works.
>>>
>>> Thanks so much again,
>>>
>>> Rion
>>>
>>> On Mon, Aug 23, 2021 at 11:12 AM David Morávek <dm...@apache.org> wrote:
>>>
>>>> Hi Rion,
>>>>
>>>> Sorry for late reply, I've missed your previous message. Thanks Arvid
>>>> for the reminder <3.
>>>>
>>>> something like a MessageWrapper<ElementT, ConfigurationT> and pass
>>>>> those elements to the sink, which would create the tenant-specific Elastic
>>>>> connection from the ConfigurationT element and handle caching it and
>>>>> then just grab the element and send it on it's way?
>>>>
>>>>
>>>> Yes, this is exactly what I had in mind. There should be almost no
>>>> overhead as sink can be easily chained with your join
>>>> (KeyedCoProcessFunction) function.
>>>>
>>>>    -
>>>>    -
>>>>>
>>>>>    The shape of the elements being evicted from the process function
>>>>>    (Is a simple wrapper with the configuration for the sink enough here? Do I
>>>>>    need to explicitly initialize the sink within this function? Etc.)
>>>>
>>>>    -
>>>>    - To write an element you need a configuration for the destination
>>>>    and the element itself, so a tuple of *(ElasticConfiguration,
>>>>    Element)* should be enough (that's basically your MessageWrapper<ElementT,
>>>>    ConfigurationT> class).
>>>>
>>>>
>>>>    -
>>>>    -
>>>>>
>>>>>    The actual use of the *DynamicElasticsearchSink* class (Would it
>>>>>    just be something like an *.addSink(**DynamicElasticSearchSink<**String,
>>>>>    Configuration>())* or perhaps something else entirely?)
>>>>
>>>>    -
>>>>
>>>> I guess it could look something like the snippet below. It would be
>>>> definitely good to play around with the *DynamicElasticSearchSink* API
>>>> and make it more meaningful / user friendly (the gist I've shared was just
>>>> a very rough prototype to showcase the idea).
>>>>
>>>>
>>>>    - static class Destination {
>>>>
>>>>        private final List<HttpHost> httpHosts;
>>>>
>>>>        Destination(List<HttpHost> httpHosts) {
>>>>            this.httpHosts = httpHosts;
>>>>        }
>>>>    }
>>>>    -
>>>>    - final DataStream<Tuple2<Destination, String>> toWrite = ...;
>>>>    toWrite.addSink(
>>>>            new DynamicElasticsearchSink<>(
>>>>                    new SinkRouter<
>>>>                            Tuple2<Destination, String>,
>>>>                            String,
>>>>                            ElasticsearchSink<Tuple2<Destination,
>>>>    String>>>() {
>>>>
>>>>                        @Override
>>>>                        public String getRoute(Tuple2<Destination,
>>>>    String> element) {
>>>>    -                         // Construct a deterministic unique
>>>>    caching key for the destination... (this could be cheaper if you know the
>>>>    data)
>>>>                            return element.f0.httpHosts.stream()
>>>>                                    .map(HttpHost::toHostString)
>>>>                                    .collect(Collectors.joining(","));
>>>>                        }
>>>>
>>>>                        @Override
>>>>                        public ElasticsearchSink<Tuple2<Destination,
>>>>    String>> createSink(
>>>>                                String cacheKey, Tuple2<Destination,
>>>>    String> element) {
>>>>                            return new ElasticsearchSink.Builder<>(
>>>>                                            element.f0.httpHosts,
>>>>                                            (ElasticsearchSinkFunction<
>>>>
>>>>    Tuple2<Destination, String>>)
>>>>                                                    (el, ctx, indexer)
>>>>    -> {
>>>>                                                        // Construct
>>>>    index request.
>>>>                                                        final
>>>>    IndexRequest request = ...;
>>>>
>>>>    indexer.add(request);
>>>>                                                    })
>>>>                                    .build();
>>>>                        }
>>>>                    }));
>>>>
>>>>
>>>> I hope this helps ;)
>>>>
>>>> Best,
>>>> D.
>>>>
>>>>
>>>> On Mon, Aug 16, 2021 at 5:18 PM Rion Williams <ri...@gmail.com>
>>>> wrote:
>>>>
>>>>> Thanks for this suggestion David, it's extremely helpful.
>>>>>
>>>>> Since this will vary depending on the elements retrieved from a
>>>>> separate stream, I'm guessing something like the following would be
>>>>> roughly the avenue to continue down:
>>>>>
>>>>> fun main(args: Array<String>) {
>>>>>     val parameters = mergeParametersFromProperties(args)
>>>>>     val stream = StreamExecutionEnvironment.getExecutionEnvironment()
>>>>>
>>>>>     // Get the stream for tenant-specific Elastic configurations
>>>>>     val connectionStream = stream
>>>>>         .fromSource(
>>>>>             KafkaSource.of(parameters, listOf("elastic-configs")),
>>>>>             WatermarkStrategy.noWatermarks(),
>>>>>             "elastic-configs"
>>>>>         )
>>>>>
>>>>>     // Get the stream of incoming messages to be routed to Elastic
>>>>>     stream
>>>>>         .fromSource(
>>>>>             KafkaSource.of(parameters, listOf("messages")),
>>>>>             WatermarkStrategy.noWatermarks(),
>>>>>             "messages"
>>>>>         )
>>>>>         .keyBy { message ->
>>>>>             // Key by the tenant in the message
>>>>>             message.getTenant()
>>>>>         }
>>>>>         .connect(
>>>>>             // Connect the messages stream with the configurations
>>>>>             connectionStream
>>>>>         )
>>>>>         .process(object : KeyedCoProcessFunction<String, String, String, String>() {
>>>>>             // For this key, we need to store all of the previous messages in state
>>>>>             // in the case where we don't have a given mapping for this tenant yet
>>>>>             lateinit var messagesAwaitingConfigState: ListState<String>
>>>>>             lateinit var configState: ValueState<String>
>>>>>
>>>>>             override fun open(parameters: Configuration) {
>>>>>                 super.open(parameters)
>>>>>                 // Initialize the states
>>>>>                 messagesAwaitingConfigState = runtimeContext.getListState(awaitingStateDesc)
>>>>>                 configState = runtimeContext.getState(configStateDesc)
>>>>>             }
>>>>>
>>>>>             // When an element is received
>>>>>             override fun processElement1(message: String, context: Context, out: Collector<String>) {
>>>>>                 // Check if we have a mapping
>>>>>                 if (configState.value() == null){
>>>>>                     // We don't have a mapping for this tenant, store messages until we get it
>>>>>                     messagesAwaitingConfigState.add(message)
>>>>>                 }
>>>>>                 else {
>>>>>                     // Output the record with some indicator of the route?
>>>>>                     out.collect(message)
>>>>>                 }
>>>>>             }
>>>>>
>>>>>             override fun processElement2(config: String, context: Context, out: Collector<String>) {
>>>>>                 // If this mapping is for this specific tenant, store it and flush the pending
>>>>>                 // records in state
>>>>>                 if (config.getTenant() == context.currentKey){
>>>>>                     configState.update(config)
>>>>>                     val messagesToFlush = messagesAwaitingConfigState.get()
>>>>>                     messagesToFlush.forEach { message ->
>>>>>                         out.collect(message)
>>>>>                     }
>>>>>                 }
>>>>>             }
>>>>>
>>>>>             // State descriptors
>>>>>             val awaitingStateDesc = ListStateDescriptor(
>>>>>                 "messages-awaiting-config",
>>>>>                 TypeInformation.of(String::class.java)
>>>>>             )
>>>>>
>>>>>             val configStateDesc = ValueStateDescriptor(
>>>>>                 "elastic-config",
>>>>>                 TypeInformation.of(String::class.java)
>>>>>             )
>>>>>         })
>>>>>
>>>>>     stream.executeAsync("$APPLICATION_NAME-job")
>>>>> }
>>>>>
>>>>> Basically, connect my tenant-specific configuration stream with my
>>>>> incoming messages (keyed by tenant) and buffer them until I have a
>>>>> corresponding configuration (to avoid race-conditions). However, I'm
>>>>> guessing what will happen here is rather than directly outputting the
>>>>> messages from this process function, I'd construct some type of wrapper
>>>>> here with the necessary routing/configuration for the message (obtained via
>>>>> the configuration stream) along with the element, which might be something
>>>>> like a MessageWrapper<ElementT, ConfigurationT> and pass those
>>>>> elements to the sink, which would create the tenant-specific Elastic
>>>>> connection from the ConfigurationT element and handle caching it and
>>>>> then just grab the element and send it on it's way?
>>>>>
>>>>> Those are really the only bits I'm stuck on at the moment:
>>>>>
>>>>>    1. The shape of the elements being evicted from the process
>>>>>    function (Is a simple wrapper with the configuration for the sink enough
>>>>>    here? Do I need to explicitly initialize the sink within this function?
>>>>>    Etc.)
>>>>>    2. The actual use of the DynamicElasticsearchSink class (Would it
>>>>>    just be something like an .addSink(DynamicElasticSearchSink<String,
>>>>>    Configuration>()) or perhaps something else entirely?)
>>>>>
>>>>> Thanks again so much for the advice thus far David, it's greatly
>>>>> appreciated.
>>>>>
>>>>> Rion
>>>>>
>>>>> On Fri, Aug 13, 2021 at 9:04 AM David Morávek <dm...@apache.org> wrote:
>>>>>
>>>>>> To give you a better idea, in high-level I think could look something
>>>>>> like this
>>>>>> <https://gist.github.com/dmvk/3f8124d585cd33e52cacd4a38b80f8c8> [1].
>>>>>>
>>>>>> [1] https://gist.github.com/dmvk/3f8124d585cd33e52cacd4a38b80f8c8
>>>>>>
>>>>>> On Fri, Aug 13, 2021 at 2:57 PM Rion Williams <ri...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi David,
>>>>>>>
>>>>>>> Thanks for your response! I think there are currently quite a few
>>>>>>> unknowns in my end in terms of what a production loads look like but I
>>>>>>> think the number of clusters shouldn’t be too large (and will either rarely
>>>>>>> change or have new entries come in at runtime, but it needs to support
>>>>>>> that).
>>>>>>>
>>>>>>> I think the dynamic approach might be a good route to explore with
>>>>>>> actual changes to the Elasticsearch sink as a longer term option. I’m not
>>>>>>> sure what the dynamic one would look like at the moment though, perhaps
>>>>>>> that’s something you’d be able to advise on?
>>>>>>>
>>>>>>> Given that all the records are keyed for a given tenant and I would
>>>>>>> have the mappings stored in state, is it possible that within the open()
>>>>>>> function for this dynamic route to access the state and initialize the
>>>>>>> client there? Or maybe there’s some other approach (such as grouping by
>>>>>>> clusters and dynamically handling indices)?
>>>>>>>
>>>>>>> I’d be happy to give a shot at making the appropriate changes to the
>>>>>>> sink as well, although I’m far from an Elastic expert. If you point me in
>>>>>>> the right direction, I may be able to help out.
>>>>>>>
>>>>>>> Thanks much!
>>>>>>>
>>>>>>> Rion
>>>>>>>
>>>>>>> On Aug 13, 2021, at 6:52 AM, David Morávek <dm...@apache.org> wrote:
>>>>>>>
>>>>>>> 
>>>>>>> Hi Rion,
>>>>>>>
>>>>>>> As you probably already know, for dynamic indices, you can simply
>>>>>>> implement your own ElasticsearchSinkFunction
>>>>>>> <https://github.com/apache/flink/blob/release-1.13.2/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkFunction.java>
>>>>>>> [1], where you can create any request that elastic client supports.
>>>>>>>
>>>>>>> The tricky part is how to implement dynamic routing into multiple
>>>>>>> clusters.
>>>>>>> - If the elastic clusters are known upfront (before submitting job),
>>>>>>> you can easily create multiple elastic sinks and prepend them with a simple
>>>>>>> filter (this is basically what split operator does).
>>>>>>> - If you discover elastics clusters at runtime, this would require
>>>>>>> some changes of the current ElasticsearchSink implementation. I think this
>>>>>>> may be actually as simple as introducing something like
>>>>>>> DynamicElasticsearchSink, that could dynamically create and managed "child"
>>>>>>> sinks. This would probably require some thoughts about how to manage
>>>>>>> consumed resources (memory), because number of child sink could be
>>>>>>> potentially unbounded. This could be of course simplified if underlying
>>>>>>> elastic client already supports that, which I'm not aware of. If you'd like
>>>>>>> to take this path, it would definitely be a great contribution to Flink
>>>>>>> (I'm able to provide some guidance).
>>>>>>>
>>>>>>> [1]
>>>>>>> https://github.com/apache/flink/blob/release-1.13.2/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkFunction.java
>>>>>>>
>>>>>>> Best,
>>>>>>> D.
>>>>>>>
>>>>>>> On Sun, Aug 8, 2021 at 4:24 PM Rion Williams <ri...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi folks,
>>>>>>>>
>>>>>>>> I have a use-case that I wanted to initially pose to the mailing
>>>>>>>> list as I’m not terribly familiar with the Elasticsearch connector to
>>>>>>>> ensure I’m not going down the wrong path trying to accomplish this in Flink
>>>>>>>> (or if something downstream might be a better option).
>>>>>>>>
>>>>>>>> Basically, I have the following pieces to the puzzle:
>>>>>>>>
>>>>>>>>    - A stream of tenant-specific events
>>>>>>>>    - An HTTP endpoint containing mappings for tenant-specific
>>>>>>>>    Elastic cluster information (as each tenant has its own specific Elastic
>>>>>>>>    cluster/index)
>>>>>>>>
>>>>>>>> What I’m hoping to accomplish is the following:
>>>>>>>>
>>>>>>>>    1. One stream will periodically poll the HTTP endpoint and
>>>>>>>>    store these cluster mappings in state (keyed by tenant with cluster info as
>>>>>>>>    the value)
>>>>>>>>    2. The event stream will be keyed by tenant and connected to
>>>>>>>>    the cluster mappings stream.
>>>>>>>>    3. I’ll need to an Elasticsearch sink that can route the
>>>>>>>>    tenant-specific event data to its corresponding cluster/index from the
>>>>>>>>    mapping source.
>>>>>>>>
>>>>>>>> I know that the existing Elasticsearch sink supports dynamic
>>>>>>>> indices, however I didn’t know if it’s possible to adjust the cluster like
>>>>>>>> I would need on a per-tenant basis or if there’s a better approach here?
>>>>>>>>
>>>>>>>> Any advice would be appreciated.
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>>
>>>>>>>> Rion
>>>>>>>>
>>>>>>>

Re: Dynamic Cluster/Index Routing for Elasticsearch Sink

Posted by Rion Williams <ri...@gmail.com>.
Hi David,

That was perfect and it looks like this is working as I'd expected. I put
together some larger integration tests for my specific use-case (multiple
Elasticsearch clusters running in TestContainers) and verified that
messages were being routed dynamically to the appropriate sinks. I forked
the Flink repo last night and was trying to figure out the best place to
start adding these classes in (I noticed that there were three separate ES
packages targeting 5/6/7 respectively). I was going to try to start
fleshing the initial implementation for this, but wanted to make sure that
I was starting in the right place.

Additionally, do you know of anything that might be similar to this even
within other sinks? Just trying to think of something to model this after.
Once I get things started, I'll spin up a JIRA issue for it and go from
there.

Thanks so much for your help!

Rion

On Tue, Aug 24, 2021 at 1:45 AM David Morávek <dm...@apache.org> wrote:

> Hi Rion,
>
> you just need to call *sink.setRuntimeContext(getRuntimeContext())*
> before opening the child sink. Please see *AbstractRichFunction* [1]
> (that EleasticsearchSink extends) for more details.
>
> One more note, instead of starting with integration test, I'd recommend
> writing a unit test using *operator test harness* [2] first. This should
> help you to discover / debug many issues upfront. You can use
> *ElasticsearchSinkBaseTest* [3] as an example.
>
> [1]
> https://github.com/apache/flink/blob/release-1.13.2/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractRichFunction.java#L52
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/testing/#unit-testing-stateful-or-timely-udfs--custom-operators
> [3]
> https://github.com/apache/flink/blob/release-1.13.2/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java
>
> Best,
> D.
>
> On Tue, Aug 24, 2021 at 12:03 AM Rion Williams <ri...@gmail.com>
> wrote:
>
>> Hi David,
>>
>> Thanks again for the response, I believe that I'm getting pretty close
>> for at least a POC-level implementation of this. Currently, I'm working
>> with JsonObject instances throughout the pipeline, so I wanted to try this
>> out and simply stored the routing information within the element itself for
>> simplicity's sake right now, so it has a shape that looks something like
>> this:
>>
>> {
>>     "route": {
>>         "hosts": "...",
>>         "index": "...",
>>         ...
>>     },
>>     "all-other-fields-here"
>> }
>>
>> And I've stripped back several of the layers of the routers (since I
>> already have all of the information in the element at that point). I tried
>> using something like this:
>>
>> class DynamicElasticsearchSink: RichSinkFunction<JsonObject>(), CheckpointedFunction {
>>     private val sinkRoutes: MutableMap<String, ElasticsearchSink<JsonObject>> = ConcurrentHashMap()
>>     private lateinit var configuration: Configuration
>>
>>     override fun open(parameters: Configuration) {
>>         configuration = parameters
>>     }
>>
>>     override fun invoke(element: JsonObject, context: SinkFunction.Context) {
>>         val route = getHost(element)
>>         // Check if we already have a router for this cluster
>>         var sink = sinkRoutes[route]
>>         if (sink == null) {
>>             // If not, create one
>>             sink = buildSinkFromRoute(element)
>>             sink.open(configuration)
>>             sinkRoutes[route] = sink
>>         }
>>
>>         sink.invoke(element, context)
>>     }
>>
>>     override fun initializeState(context: FunctionInitializationContext) {
>>         // No-op.
>>     }
>>
>>     override fun snapshotState(context: FunctionSnapshotContext) {
>>         // This is used only to flush pending writes.
>>         for (sink in sinkRoutes.values) {
>>             sink.snapshotState(context)
>>         }
>>     }
>>
>>     override fun close() {
>>         for (sink in sinkRoutes.values) {
>>             sink.close()
>>         }
>>     }
>>
>>     private fun buildSinkFromRoute(element: JsonObject, ho): ElasticsearchSink<JsonObject> {
>>         val builder = ElasticsearchSink.Builder<JsonObject>(
>>             buildHostsFromElement(element),
>>             ElasticsearchRoutingFunction()
>>         )
>>
>>         builder.setBulkFlushMaxActions(1)
>>
>>         // TODO: Configure authorization if available
>> //        builder.setRestClientFactory { restClient ->
>> //            restClient.setHttpClientConfigCallback(object : RestClientBuilder.HttpClientConfigCallback {
>> //                override fun customizeHttpClient(builder: HttpAsyncClientBuilder): HttpAsyncClientBuilder {
>> //                    // Configure authorization here
>> //                    val credentialsProvider = BasicCredentialsProvider().apply {
>> //                        setCredentials(
>> //                            AuthScope.ANY,
>> //                            UsernamePasswordCredentials("$USERNAME", "$PASSWORD")
>> //                        )
>> //                    }
>> //
>> //                    return builder.setDefaultCredentialsProvider(credentialsProvider);
>> //                }
>> //            })
>> //        }
>>
>>         return builder.build()
>>     }
>>
>>     private fun buildHostsFromElement(element: JsonObject): List<HttpHost>{
>>         val transportAddresses = element
>>             .get("route").asJsonObject
>>             .get("hosts").asString
>>
>>         // If there are multiple, they should be comma-delimited
>>         val addresses = transportAddresses.split(",")
>>         return addresses
>>             .filter { address -> address.isNotEmpty() }
>>             .map { address ->
>>                 HttpHost.create(address)
>>             }
>>     }
>>
>>     private fun getHost(element: JsonObject): String {
>>         return element
>>             .get("route").asJsonObject
>>             .get("hosts").asString
>>     }
>>
>>     private class ElasticsearchRoutingFunction: ElasticsearchSinkFunction<JsonObject> {
>>         override fun process(element: JsonObject, context: RuntimeContext, indexer: RequestIndexer) {
>>             indexer.add(request(element))
>>         }
>>
>>         private fun request(element: JsonObject): IndexRequest {
>>             // Access routing information
>>             val index = element
>>                 .get("route").asJsonObject
>>                 .get("index").asString
>>
>>             // Strip off routing information
>>             element.remove("route")
>>
>>             // Send the request
>>             return Requests.indexRequest()
>>                 .index(index)
>>                 .type("_doc")
>>                 .source(mapOf(
>>                     "data" to "$element"
>>                 ))
>>         }
>>     }
>> }
>>
>> After running an integration test, I keep encountering running into the
>> following error during the invocation of the child sink:
>>
>> // The runtime context has not been initialized.
>> sink.invoke(element, context)
>>
>> I can see the underlying sink getting initialized, the open call being
>> made, etc. however for some reason it looks like there's an issue related
>> to the context during the invoke call namely* "The runtime context has
>> not been initialized". *I had assumed this would be alright since the
>> context for the "wrapper" should have already been initialized, but maybe
>> there's something that I'm missing.
>>
>> Also, please forgive any hastily written or nasty code as this is purely
>> a POC to see if I could get this to work using a single object. I have the
>> hopes of cleaning it up and genericizing it after I am confident that it
>> actually works.
>>
>> Thanks so much again,
>>
>> Rion
>>
>> On Mon, Aug 23, 2021 at 11:12 AM David Morávek <dm...@apache.org> wrote:
>>
>>> Hi Rion,
>>>
>>> Sorry for late reply, I've missed your previous message. Thanks Arvid
>>> for the reminder <3.
>>>
>>> something like a MessageWrapper<ElementT, ConfigurationT> and pass
>>>> those elements to the sink, which would create the tenant-specific Elastic
>>>> connection from the ConfigurationT element and handle caching it and
>>>> then just grab the element and send it on it's way?
>>>
>>>
>>> Yes, this is exactly what I had in mind. There should be almost no
>>> overhead as sink can be easily chained with your join
>>> (KeyedCoProcessFunction) function.
>>>
>>>    -
>>>    -
>>>>
>>>>    The shape of the elements being evicted from the process function
>>>>    (Is a simple wrapper with the configuration for the sink enough here? Do I
>>>>    need to explicitly initialize the sink within this function? Etc.)
>>>
>>>    -
>>>    - To write an element you need a configuration for the destination
>>>    and the element itself, so a tuple of *(ElasticConfiguration,
>>>    Element)* should be enough (that's basically your MessageWrapper<ElementT,
>>>    ConfigurationT> class).
>>>
>>>
>>>    -
>>>    -
>>>>
>>>>    The actual use of the *DynamicElasticsearchSink* class (Would it
>>>>    just be something like an *.addSink(**DynamicElasticSearchSink<**String,
>>>>    Configuration>())* or perhaps something else entirely?)
>>>
>>>    -
>>>
>>> I guess it could look something like the snippet below. It would be
>>> definitely good to play around with the *DynamicElasticSearchSink* API
>>> and make it more meaningful / user friendly (the gist I've shared was just
>>> a very rough prototype to showcase the idea).
>>>
>>>
>>>    - static class Destination {
>>>
>>>        private final List<HttpHost> httpHosts;
>>>
>>>        Destination(List<HttpHost> httpHosts) {
>>>            this.httpHosts = httpHosts;
>>>        }
>>>    }
>>>    -
>>>    - final DataStream<Tuple2<Destination, String>> toWrite = ...;
>>>    toWrite.addSink(
>>>            new DynamicElasticsearchSink<>(
>>>                    new SinkRouter<
>>>                            Tuple2<Destination, String>,
>>>                            String,
>>>                            ElasticsearchSink<Tuple2<Destination,
>>>    String>>>() {
>>>
>>>                        @Override
>>>                        public String getRoute(Tuple2<Destination,
>>>    String> element) {
>>>    -                         // Construct a deterministic unique
>>>    caching key for the destination... (this could be cheaper if you know the
>>>    data)
>>>                            return element.f0.httpHosts.stream()
>>>                                    .map(HttpHost::toHostString)
>>>                                    .collect(Collectors.joining(","));
>>>                        }
>>>
>>>                        @Override
>>>                        public ElasticsearchSink<Tuple2<Destination,
>>>    String>> createSink(
>>>                                String cacheKey, Tuple2<Destination,
>>>    String> element) {
>>>                            return new ElasticsearchSink.Builder<>(
>>>                                            element.f0.httpHosts,
>>>                                            (ElasticsearchSinkFunction<
>>>
>>>    Tuple2<Destination, String>>)
>>>                                                    (el, ctx, indexer)
>>>    -> {
>>>                                                        // Construct
>>>    index request.
>>>                                                        final
>>>    IndexRequest request = ...;
>>>
>>>    indexer.add(request);
>>>                                                    })
>>>                                    .build();
>>>                        }
>>>                    }));
>>>
>>>
>>> I hope this helps ;)
>>>
>>> Best,
>>> D.
>>>
>>>
>>> On Mon, Aug 16, 2021 at 5:18 PM Rion Williams <ri...@gmail.com>
>>> wrote:
>>>
>>>> Thanks for this suggestion David, it's extremely helpful.
>>>>
>>>> Since this will vary depending on the elements retrieved from a
>>>> separate stream, I'm guessing something like the following would be
>>>> roughly the avenue to continue down:
>>>>
>>>> fun main(args: Array<String>) {
>>>>     val parameters = mergeParametersFromProperties(args)
>>>>     val stream = StreamExecutionEnvironment.getExecutionEnvironment()
>>>>
>>>>     // Get the stream for tenant-specific Elastic configurations
>>>>     val connectionStream = stream
>>>>         .fromSource(
>>>>             KafkaSource.of(parameters, listOf("elastic-configs")),
>>>>             WatermarkStrategy.noWatermarks(),
>>>>             "elastic-configs"
>>>>         )
>>>>
>>>>     // Get the stream of incoming messages to be routed to Elastic
>>>>     stream
>>>>         .fromSource(
>>>>             KafkaSource.of(parameters, listOf("messages")),
>>>>             WatermarkStrategy.noWatermarks(),
>>>>             "messages"
>>>>         )
>>>>         .keyBy { message ->
>>>>             // Key by the tenant in the message
>>>>             message.getTenant()
>>>>         }
>>>>         .connect(
>>>>             // Connect the messages stream with the configurations
>>>>             connectionStream
>>>>         )
>>>>         .process(object : KeyedCoProcessFunction<String, String, String, String>() {
>>>>             // For this key, we need to store all of the previous messages in state
>>>>             // in the case where we don't have a given mapping for this tenant yet
>>>>             lateinit var messagesAwaitingConfigState: ListState<String>
>>>>             lateinit var configState: ValueState<String>
>>>>
>>>>             override fun open(parameters: Configuration) {
>>>>                 super.open(parameters)
>>>>                 // Initialize the states
>>>>                 messagesAwaitingConfigState = runtimeContext.getListState(awaitingStateDesc)
>>>>                 configState = runtimeContext.getState(configStateDesc)
>>>>             }
>>>>
>>>>             // When an element is received
>>>>             override fun processElement1(message: String, context: Context, out: Collector<String>) {
>>>>                 // Check if we have a mapping
>>>>                 if (configState.value() == null){
>>>>                     // We don't have a mapping for this tenant, store messages until we get it
>>>>                     messagesAwaitingConfigState.add(message)
>>>>                 }
>>>>                 else {
>>>>                     // Output the record with some indicator of the route?
>>>>                     out.collect(message)
>>>>                 }
>>>>             }
>>>>
>>>>             override fun processElement2(config: String, context: Context, out: Collector<String>) {
>>>>                 // If this mapping is for this specific tenant, store it and flush the pending
>>>>                 // records in state
>>>>                 if (config.getTenant() == context.currentKey){
>>>>                     configState.update(config)
>>>>                     val messagesToFlush = messagesAwaitingConfigState.get()
>>>>                     messagesToFlush.forEach { message ->
>>>>                         out.collect(message)
>>>>                     }
>>>>                 }
>>>>             }
>>>>
>>>>             // State descriptors
>>>>             val awaitingStateDesc = ListStateDescriptor(
>>>>                 "messages-awaiting-config",
>>>>                 TypeInformation.of(String::class.java)
>>>>             )
>>>>
>>>>             val configStateDesc = ValueStateDescriptor(
>>>>                 "elastic-config",
>>>>                 TypeInformation.of(String::class.java)
>>>>             )
>>>>         })
>>>>
>>>>     stream.executeAsync("$APPLICATION_NAME-job")
>>>> }
>>>>
>>>> Basically, connect my tenant-specific configuration stream with my
>>>> incoming messages (keyed by tenant) and buffer them until I have a
>>>> corresponding configuration (to avoid race-conditions). However, I'm
>>>> guessing what will happen here is rather than directly outputting the
>>>> messages from this process function, I'd construct some type of wrapper
>>>> here with the necessary routing/configuration for the message (obtained via
>>>> the configuration stream) along with the element, which might be something
>>>> like a MessageWrapper<ElementT, ConfigurationT> and pass those
>>>> elements to the sink, which would create the tenant-specific Elastic
>>>> connection from the ConfigurationT element and handle caching it and
>>>> then just grab the element and send it on it's way?
>>>>
>>>> Those are really the only bits I'm stuck on at the moment:
>>>>
>>>>    1. The shape of the elements being evicted from the process
>>>>    function (Is a simple wrapper with the configuration for the sink enough
>>>>    here? Do I need to explicitly initialize the sink within this function?
>>>>    Etc.)
>>>>    2. The actual use of the DynamicElasticsearchSink class (Would it
>>>>    just be something like an .addSink(DynamicElasticSearchSink<String,
>>>>    Configuration>()) or perhaps something else entirely?)
>>>>
>>>> Thanks again so much for the advice thus far David, it's greatly
>>>> appreciated.
>>>>
>>>> Rion
>>>>
>>>> On Fri, Aug 13, 2021 at 9:04 AM David Morávek <dm...@apache.org> wrote:
>>>>
>>>>> To give you a better idea, in high-level I think could look something
>>>>> like this
>>>>> <https://gist.github.com/dmvk/3f8124d585cd33e52cacd4a38b80f8c8> [1].
>>>>>
>>>>> [1] https://gist.github.com/dmvk/3f8124d585cd33e52cacd4a38b80f8c8
>>>>>
>>>>> On Fri, Aug 13, 2021 at 2:57 PM Rion Williams <ri...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi David,
>>>>>>
>>>>>> Thanks for your response! I think there are currently quite a few
>>>>>> unknowns in my end in terms of what a production loads look like but I
>>>>>> think the number of clusters shouldn’t be too large (and will either rarely
>>>>>> change or have new entries come in at runtime, but it needs to support
>>>>>> that).
>>>>>>
>>>>>> I think the dynamic approach might be a good route to explore with
>>>>>> actual changes to the Elasticsearch sink as a longer term option. I’m not
>>>>>> sure what the dynamic one would look like at the moment though, perhaps
>>>>>> that’s something you’d be able to advise on?
>>>>>>
>>>>>> Given that all the records are keyed for a given tenant and I would
>>>>>> have the mappings stored in state, is it possible that within the open()
>>>>>> function for this dynamic route to access the state and initialize the
>>>>>> client there? Or maybe there’s some other approach (such as grouping by
>>>>>> clusters and dynamically handling indices)?
>>>>>>
>>>>>> I’d be happy to give a shot at making the appropriate changes to the
>>>>>> sink as well, although I’m far from an Elastic expert. If you point me in
>>>>>> the right direction, I may be able to help out.
>>>>>>
>>>>>> Thanks much!
>>>>>>
>>>>>> Rion
>>>>>>
>>>>>> On Aug 13, 2021, at 6:52 AM, David Morávek <dm...@apache.org> wrote:
>>>>>>
>>>>>> 
>>>>>> Hi Rion,
>>>>>>
>>>>>> As you probably already know, for dynamic indices, you can simply
>>>>>> implement your own ElasticsearchSinkFunction
>>>>>> <https://github.com/apache/flink/blob/release-1.13.2/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkFunction.java>
>>>>>> [1], where you can create any request that elastic client supports.
>>>>>>
>>>>>> The tricky part is how to implement dynamic routing into multiple
>>>>>> clusters.
>>>>>> - If the elastic clusters are known upfront (before submitting job),
>>>>>> you can easily create multiple elastic sinks and prepend them with a simple
>>>>>> filter (this is basically what split operator does).
>>>>>> - If you discover elastics clusters at runtime, this would require
>>>>>> some changes of the current ElasticsearchSink implementation. I think this
>>>>>> may be actually as simple as introducing something like
>>>>>> DynamicElasticsearchSink, that could dynamically create and managed "child"
>>>>>> sinks. This would probably require some thoughts about how to manage
>>>>>> consumed resources (memory), because number of child sink could be
>>>>>> potentially unbounded. This could be of course simplified if underlying
>>>>>> elastic client already supports that, which I'm not aware of. If you'd like
>>>>>> to take this path, it would definitely be a great contribution to Flink
>>>>>> (I'm able to provide some guidance).
>>>>>>
>>>>>> [1]
>>>>>> https://github.com/apache/flink/blob/release-1.13.2/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkFunction.java
>>>>>>
>>>>>> Best,
>>>>>> D.
>>>>>>
>>>>>> On Sun, Aug 8, 2021 at 4:24 PM Rion Williams <ri...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi folks,
>>>>>>>
>>>>>>> I have a use-case that I wanted to initially pose to the mailing
>>>>>>> list as I’m not terribly familiar with the Elasticsearch connector to
>>>>>>> ensure I’m not going down the wrong path trying to accomplish this in Flink
>>>>>>> (or if something downstream might be a better option).
>>>>>>>
>>>>>>> Basically, I have the following pieces to the puzzle:
>>>>>>>
>>>>>>>    - A stream of tenant-specific events
>>>>>>>    - An HTTP endpoint containing mappings for tenant-specific
>>>>>>>    Elastic cluster information (as each tenant has its own specific Elastic
>>>>>>>    cluster/index)
>>>>>>>
>>>>>>> What I’m hoping to accomplish is the following:
>>>>>>>
>>>>>>>    1. One stream will periodically poll the HTTP endpoint and store
>>>>>>>    these cluster mappings in state (keyed by tenant with cluster info as the
>>>>>>>    value)
>>>>>>>    2. The event stream will be keyed by tenant and connected to the
>>>>>>>    cluster mappings stream.
>>>>>>>    3. I’ll need to an Elasticsearch sink that can route the
>>>>>>>    tenant-specific event data to its corresponding cluster/index from the
>>>>>>>    mapping source.
>>>>>>>
>>>>>>> I know that the existing Elasticsearch sink supports dynamic
>>>>>>> indices, however I didn’t know if it’s possible to adjust the cluster like
>>>>>>> I would need on a per-tenant basis or if there’s a better approach here?
>>>>>>>
>>>>>>> Any advice would be appreciated.
>>>>>>>
>>>>>>> Thanks,
>>>>>>>
>>>>>>> Rion
>>>>>>>
>>>>>>

Re: Dynamic Cluster/Index Routing for Elasticsearch Sink

Posted by David Morávek <dm...@apache.org>.
Hi Rion,

you just need to call *sink.setRuntimeContext(getRuntimeContext())* before
opening the child sink. Please see *AbstractRichFunction* [1] (that
EleasticsearchSink extends) for more details.

One more note, instead of starting with integration test, I'd recommend
writing a unit test using *operator test harness* [2] first. This should
help you to discover / debug many issues upfront. You can use
*ElasticsearchSinkBaseTest* [3] as an example.

[1]
https://github.com/apache/flink/blob/release-1.13.2/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractRichFunction.java#L52
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/testing/#unit-testing-stateful-or-timely-udfs--custom-operators
[3]
https://github.com/apache/flink/blob/release-1.13.2/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java

Best,
D.

On Tue, Aug 24, 2021 at 12:03 AM Rion Williams <ri...@gmail.com>
wrote:

> Hi David,
>
> Thanks again for the response, I believe that I'm getting pretty close for
> at least a POC-level implementation of this. Currently, I'm working with
> JsonObject instances throughout the pipeline, so I wanted to try this out
> and simply stored the routing information within the element itself for
> simplicity's sake right now, so it has a shape that looks something like
> this:
>
> {
>     "route": {
>         "hosts": "...",
>         "index": "...",
>         ...
>     },
>     "all-other-fields-here"
> }
>
> And I've stripped back several of the layers of the routers (since I
> already have all of the information in the element at that point). I tried
> using something like this:
>
> class DynamicElasticsearchSink: RichSinkFunction<JsonObject>(), CheckpointedFunction {
>     private val sinkRoutes: MutableMap<String, ElasticsearchSink<JsonObject>> = ConcurrentHashMap()
>     private lateinit var configuration: Configuration
>
>     override fun open(parameters: Configuration) {
>         configuration = parameters
>     }
>
>     override fun invoke(element: JsonObject, context: SinkFunction.Context) {
>         val route = getHost(element)
>         // Check if we already have a router for this cluster
>         var sink = sinkRoutes[route]
>         if (sink == null) {
>             // If not, create one
>             sink = buildSinkFromRoute(element)
>             sink.open(configuration)
>             sinkRoutes[route] = sink
>         }
>
>         sink.invoke(element, context)
>     }
>
>     override fun initializeState(context: FunctionInitializationContext) {
>         // No-op.
>     }
>
>     override fun snapshotState(context: FunctionSnapshotContext) {
>         // This is used only to flush pending writes.
>         for (sink in sinkRoutes.values) {
>             sink.snapshotState(context)
>         }
>     }
>
>     override fun close() {
>         for (sink in sinkRoutes.values) {
>             sink.close()
>         }
>     }
>
>     private fun buildSinkFromRoute(element: JsonObject, ho): ElasticsearchSink<JsonObject> {
>         val builder = ElasticsearchSink.Builder<JsonObject>(
>             buildHostsFromElement(element),
>             ElasticsearchRoutingFunction()
>         )
>
>         builder.setBulkFlushMaxActions(1)
>
>         // TODO: Configure authorization if available
> //        builder.setRestClientFactory { restClient ->
> //            restClient.setHttpClientConfigCallback(object : RestClientBuilder.HttpClientConfigCallback {
> //                override fun customizeHttpClient(builder: HttpAsyncClientBuilder): HttpAsyncClientBuilder {
> //                    // Configure authorization here
> //                    val credentialsProvider = BasicCredentialsProvider().apply {
> //                        setCredentials(
> //                            AuthScope.ANY,
> //                            UsernamePasswordCredentials("$USERNAME", "$PASSWORD")
> //                        )
> //                    }
> //
> //                    return builder.setDefaultCredentialsProvider(credentialsProvider);
> //                }
> //            })
> //        }
>
>         return builder.build()
>     }
>
>     private fun buildHostsFromElement(element: JsonObject): List<HttpHost>{
>         val transportAddresses = element
>             .get("route").asJsonObject
>             .get("hosts").asString
>
>         // If there are multiple, they should be comma-delimited
>         val addresses = transportAddresses.split(",")
>         return addresses
>             .filter { address -> address.isNotEmpty() }
>             .map { address ->
>                 HttpHost.create(address)
>             }
>     }
>
>     private fun getHost(element: JsonObject): String {
>         return element
>             .get("route").asJsonObject
>             .get("hosts").asString
>     }
>
>     private class ElasticsearchRoutingFunction: ElasticsearchSinkFunction<JsonObject> {
>         override fun process(element: JsonObject, context: RuntimeContext, indexer: RequestIndexer) {
>             indexer.add(request(element))
>         }
>
>         private fun request(element: JsonObject): IndexRequest {
>             // Access routing information
>             val index = element
>                 .get("route").asJsonObject
>                 .get("index").asString
>
>             // Strip off routing information
>             element.remove("route")
>
>             // Send the request
>             return Requests.indexRequest()
>                 .index(index)
>                 .type("_doc")
>                 .source(mapOf(
>                     "data" to "$element"
>                 ))
>         }
>     }
> }
>
> After running an integration test, I keep encountering running into the
> following error during the invocation of the child sink:
>
> // The runtime context has not been initialized.
> sink.invoke(element, context)
>
> I can see the underlying sink getting initialized, the open call being
> made, etc. however for some reason it looks like there's an issue related
> to the context during the invoke call namely* "The runtime context has
> not been initialized". *I had assumed this would be alright since the
> context for the "wrapper" should have already been initialized, but maybe
> there's something that I'm missing.
>
> Also, please forgive any hastily written or nasty code as this is purely a
> POC to see if I could get this to work using a single object. I have the
> hopes of cleaning it up and genericizing it after I am confident that it
> actually works.
>
> Thanks so much again,
>
> Rion
>
> On Mon, Aug 23, 2021 at 11:12 AM David Morávek <dm...@apache.org> wrote:
>
>> Hi Rion,
>>
>> Sorry for late reply, I've missed your previous message. Thanks Arvid for
>> the reminder <3.
>>
>> something like a MessageWrapper<ElementT, ConfigurationT> and pass those
>>> elements to the sink, which would create the tenant-specific Elastic
>>> connection from the ConfigurationT element and handle caching it and
>>> then just grab the element and send it on it's way?
>>
>>
>> Yes, this is exactly what I had in mind. There should be almost no
>> overhead as sink can be easily chained with your join
>> (KeyedCoProcessFunction) function.
>>
>>    -
>>    -
>>>
>>>    The shape of the elements being evicted from the process function
>>>    (Is a simple wrapper with the configuration for the sink enough here? Do I
>>>    need to explicitly initialize the sink within this function? Etc.)
>>
>>    -
>>    - To write an element you need a configuration for the destination
>>    and the element itself, so a tuple of *(ElasticConfiguration,
>>    Element)* should be enough (that's basically your MessageWrapper<ElementT,
>>    ConfigurationT> class).
>>
>>
>>    -
>>    -
>>>
>>>    The actual use of the *DynamicElasticsearchSink* class (Would it
>>>    just be something like an *.addSink(**DynamicElasticSearchSink<**String,
>>>    Configuration>())* or perhaps something else entirely?)
>>
>>    -
>>
>> I guess it could look something like the snippet below. It would be
>> definitely good to play around with the *DynamicElasticSearchSink* API
>> and make it more meaningful / user friendly (the gist I've shared was just
>> a very rough prototype to showcase the idea).
>>
>>
>>    - static class Destination {
>>
>>        private final List<HttpHost> httpHosts;
>>
>>        Destination(List<HttpHost> httpHosts) {
>>            this.httpHosts = httpHosts;
>>        }
>>    }
>>    -
>>    - final DataStream<Tuple2<Destination, String>> toWrite = ...;
>>    toWrite.addSink(
>>            new DynamicElasticsearchSink<>(
>>                    new SinkRouter<
>>                            Tuple2<Destination, String>,
>>                            String,
>>                            ElasticsearchSink<Tuple2<Destination,
>>    String>>>() {
>>
>>                        @Override
>>                        public String getRoute(Tuple2<Destination,
>>    String> element) {
>>    -                         // Construct a deterministic unique caching
>>    key for the destination... (this could be cheaper if you know the data)
>>                            return element.f0.httpHosts.stream()
>>                                    .map(HttpHost::toHostString)
>>                                    .collect(Collectors.joining(","));
>>                        }
>>
>>                        @Override
>>                        public ElasticsearchSink<Tuple2<Destination,
>>    String>> createSink(
>>                                String cacheKey, Tuple2<Destination,
>>    String> element) {
>>                            return new ElasticsearchSink.Builder<>(
>>                                            element.f0.httpHosts,
>>                                            (ElasticsearchSinkFunction<
>>
>>    Tuple2<Destination, String>>)
>>                                                    (el, ctx, indexer) ->
>>    {
>>                                                        // Construct
>>    index request.
>>                                                        final
>>    IndexRequest request = ...;
>>
>>    indexer.add(request);
>>                                                    })
>>                                    .build();
>>                        }
>>                    }));
>>
>>
>> I hope this helps ;)
>>
>> Best,
>> D.
>>
>>
>> On Mon, Aug 16, 2021 at 5:18 PM Rion Williams <ri...@gmail.com>
>> wrote:
>>
>>> Thanks for this suggestion David, it's extremely helpful.
>>>
>>> Since this will vary depending on the elements retrieved from a separate
>>> stream, I'm guessing something like the following would be roughly the
>>> avenue to continue down:
>>>
>>> fun main(args: Array<String>) {
>>>     val parameters = mergeParametersFromProperties(args)
>>>     val stream = StreamExecutionEnvironment.getExecutionEnvironment()
>>>
>>>     // Get the stream for tenant-specific Elastic configurations
>>>     val connectionStream = stream
>>>         .fromSource(
>>>             KafkaSource.of(parameters, listOf("elastic-configs")),
>>>             WatermarkStrategy.noWatermarks(),
>>>             "elastic-configs"
>>>         )
>>>
>>>     // Get the stream of incoming messages to be routed to Elastic
>>>     stream
>>>         .fromSource(
>>>             KafkaSource.of(parameters, listOf("messages")),
>>>             WatermarkStrategy.noWatermarks(),
>>>             "messages"
>>>         )
>>>         .keyBy { message ->
>>>             // Key by the tenant in the message
>>>             message.getTenant()
>>>         }
>>>         .connect(
>>>             // Connect the messages stream with the configurations
>>>             connectionStream
>>>         )
>>>         .process(object : KeyedCoProcessFunction<String, String, String, String>() {
>>>             // For this key, we need to store all of the previous messages in state
>>>             // in the case where we don't have a given mapping for this tenant yet
>>>             lateinit var messagesAwaitingConfigState: ListState<String>
>>>             lateinit var configState: ValueState<String>
>>>
>>>             override fun open(parameters: Configuration) {
>>>                 super.open(parameters)
>>>                 // Initialize the states
>>>                 messagesAwaitingConfigState = runtimeContext.getListState(awaitingStateDesc)
>>>                 configState = runtimeContext.getState(configStateDesc)
>>>             }
>>>
>>>             // When an element is received
>>>             override fun processElement1(message: String, context: Context, out: Collector<String>) {
>>>                 // Check if we have a mapping
>>>                 if (configState.value() == null){
>>>                     // We don't have a mapping for this tenant, store messages until we get it
>>>                     messagesAwaitingConfigState.add(message)
>>>                 }
>>>                 else {
>>>                     // Output the record with some indicator of the route?
>>>                     out.collect(message)
>>>                 }
>>>             }
>>>
>>>             override fun processElement2(config: String, context: Context, out: Collector<String>) {
>>>                 // If this mapping is for this specific tenant, store it and flush the pending
>>>                 // records in state
>>>                 if (config.getTenant() == context.currentKey){
>>>                     configState.update(config)
>>>                     val messagesToFlush = messagesAwaitingConfigState.get()
>>>                     messagesToFlush.forEach { message ->
>>>                         out.collect(message)
>>>                     }
>>>                 }
>>>             }
>>>
>>>             // State descriptors
>>>             val awaitingStateDesc = ListStateDescriptor(
>>>                 "messages-awaiting-config",
>>>                 TypeInformation.of(String::class.java)
>>>             )
>>>
>>>             val configStateDesc = ValueStateDescriptor(
>>>                 "elastic-config",
>>>                 TypeInformation.of(String::class.java)
>>>             )
>>>         })
>>>
>>>     stream.executeAsync("$APPLICATION_NAME-job")
>>> }
>>>
>>> Basically, connect my tenant-specific configuration stream with my
>>> incoming messages (keyed by tenant) and buffer them until I have a
>>> corresponding configuration (to avoid race-conditions). However, I'm
>>> guessing what will happen here is rather than directly outputting the
>>> messages from this process function, I'd construct some type of wrapper
>>> here with the necessary routing/configuration for the message (obtained via
>>> the configuration stream) along with the element, which might be something
>>> like a MessageWrapper<ElementT, ConfigurationT> and pass those elements
>>> to the sink, which would create the tenant-specific Elastic connection from
>>> the ConfigurationT element and handle caching it and then just grab the
>>> element and send it on it's way?
>>>
>>> Those are really the only bits I'm stuck on at the moment:
>>>
>>>    1. The shape of the elements being evicted from the process function
>>>    (Is a simple wrapper with the configuration for the sink enough here? Do I
>>>    need to explicitly initialize the sink within this function? Etc.)
>>>    2. The actual use of the DynamicElasticsearchSink class (Would it
>>>    just be something like an .addSink(DynamicElasticSearchSink<String,
>>>    Configuration>()) or perhaps something else entirely?)
>>>
>>> Thanks again so much for the advice thus far David, it's greatly
>>> appreciated.
>>>
>>> Rion
>>>
>>> On Fri, Aug 13, 2021 at 9:04 AM David Morávek <dm...@apache.org> wrote:
>>>
>>>> To give you a better idea, in high-level I think could look something
>>>> like this
>>>> <https://gist.github.com/dmvk/3f8124d585cd33e52cacd4a38b80f8c8> [1].
>>>>
>>>> [1] https://gist.github.com/dmvk/3f8124d585cd33e52cacd4a38b80f8c8
>>>>
>>>> On Fri, Aug 13, 2021 at 2:57 PM Rion Williams <ri...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi David,
>>>>>
>>>>> Thanks for your response! I think there are currently quite a few
>>>>> unknowns in my end in terms of what a production loads look like but I
>>>>> think the number of clusters shouldn’t be too large (and will either rarely
>>>>> change or have new entries come in at runtime, but it needs to support
>>>>> that).
>>>>>
>>>>> I think the dynamic approach might be a good route to explore with
>>>>> actual changes to the Elasticsearch sink as a longer term option. I’m not
>>>>> sure what the dynamic one would look like at the moment though, perhaps
>>>>> that’s something you’d be able to advise on?
>>>>>
>>>>> Given that all the records are keyed for a given tenant and I would
>>>>> have the mappings stored in state, is it possible that within the open()
>>>>> function for this dynamic route to access the state and initialize the
>>>>> client there? Or maybe there’s some other approach (such as grouping by
>>>>> clusters and dynamically handling indices)?
>>>>>
>>>>> I’d be happy to give a shot at making the appropriate changes to the
>>>>> sink as well, although I’m far from an Elastic expert. If you point me in
>>>>> the right direction, I may be able to help out.
>>>>>
>>>>> Thanks much!
>>>>>
>>>>> Rion
>>>>>
>>>>> On Aug 13, 2021, at 6:52 AM, David Morávek <dm...@apache.org> wrote:
>>>>>
>>>>> 
>>>>> Hi Rion,
>>>>>
>>>>> As you probably already know, for dynamic indices, you can simply
>>>>> implement your own ElasticsearchSinkFunction
>>>>> <https://github.com/apache/flink/blob/release-1.13.2/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkFunction.java>
>>>>> [1], where you can create any request that elastic client supports.
>>>>>
>>>>> The tricky part is how to implement dynamic routing into multiple
>>>>> clusters.
>>>>> - If the elastic clusters are known upfront (before submitting job),
>>>>> you can easily create multiple elastic sinks and prepend them with a simple
>>>>> filter (this is basically what split operator does).
>>>>> - If you discover elastics clusters at runtime, this would require
>>>>> some changes of the current ElasticsearchSink implementation. I think this
>>>>> may be actually as simple as introducing something like
>>>>> DynamicElasticsearchSink, that could dynamically create and managed "child"
>>>>> sinks. This would probably require some thoughts about how to manage
>>>>> consumed resources (memory), because number of child sink could be
>>>>> potentially unbounded. This could be of course simplified if underlying
>>>>> elastic client already supports that, which I'm not aware of. If you'd like
>>>>> to take this path, it would definitely be a great contribution to Flink
>>>>> (I'm able to provide some guidance).
>>>>>
>>>>> [1]
>>>>> https://github.com/apache/flink/blob/release-1.13.2/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkFunction.java
>>>>>
>>>>> Best,
>>>>> D.
>>>>>
>>>>> On Sun, Aug 8, 2021 at 4:24 PM Rion Williams <ri...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi folks,
>>>>>>
>>>>>> I have a use-case that I wanted to initially pose to the mailing list
>>>>>> as I’m not terribly familiar with the Elasticsearch connector to ensure I’m
>>>>>> not going down the wrong path trying to accomplish this in Flink (or if
>>>>>> something downstream might be a better option).
>>>>>>
>>>>>> Basically, I have the following pieces to the puzzle:
>>>>>>
>>>>>>    - A stream of tenant-specific events
>>>>>>    - An HTTP endpoint containing mappings for tenant-specific
>>>>>>    Elastic cluster information (as each tenant has its own specific Elastic
>>>>>>    cluster/index)
>>>>>>
>>>>>> What I’m hoping to accomplish is the following:
>>>>>>
>>>>>>    1. One stream will periodically poll the HTTP endpoint and store
>>>>>>    these cluster mappings in state (keyed by tenant with cluster info as the
>>>>>>    value)
>>>>>>    2. The event stream will be keyed by tenant and connected to the
>>>>>>    cluster mappings stream.
>>>>>>    3. I’ll need to an Elasticsearch sink that can route the
>>>>>>    tenant-specific event data to its corresponding cluster/index from the
>>>>>>    mapping source.
>>>>>>
>>>>>> I know that the existing Elasticsearch sink supports dynamic indices,
>>>>>> however I didn’t know if it’s possible to adjust the cluster like I would
>>>>>> need on a per-tenant basis or if there’s a better approach here?
>>>>>>
>>>>>> Any advice would be appreciated.
>>>>>>
>>>>>> Thanks,
>>>>>>
>>>>>> Rion
>>>>>>
>>>>>

Re: Dynamic Cluster/Index Routing for Elasticsearch Sink

Posted by Rion Williams <ri...@gmail.com>.
Hi David,

Thanks again for the response, I believe that I'm getting pretty close for
at least a POC-level implementation of this. Currently, I'm working with
JsonObject instances throughout the pipeline, so I wanted to try this out
and simply stored the routing information within the element itself for
simplicity's sake right now, so it has a shape that looks something like
this:

{
    "route": {
        "hosts": "...",
        "index": "...",
        ...
    },
    "all-other-fields-here"
}

And I've stripped back several of the layers of the routers (since I
already have all of the information in the element at that point). I tried
using something like this:

class DynamicElasticsearchSink: RichSinkFunction<JsonObject>(),
CheckpointedFunction {
    private val sinkRoutes: MutableMap<String,
ElasticsearchSink<JsonObject>> = ConcurrentHashMap()
    private lateinit var configuration: Configuration

    override fun open(parameters: Configuration) {
        configuration = parameters
    }

    override fun invoke(element: JsonObject, context: SinkFunction.Context) {
        val route = getHost(element)
        // Check if we already have a router for this cluster
        var sink = sinkRoutes[route]
        if (sink == null) {
            // If not, create one
            sink = buildSinkFromRoute(element)
            sink.open(configuration)
            sinkRoutes[route] = sink
        }

        sink.invoke(element, context)
    }

    override fun initializeState(context: FunctionInitializationContext) {
        // No-op.
    }

    override fun snapshotState(context: FunctionSnapshotContext) {
        // This is used only to flush pending writes.
        for (sink in sinkRoutes.values) {
            sink.snapshotState(context)
        }
    }

    override fun close() {
        for (sink in sinkRoutes.values) {
            sink.close()
        }
    }

    private fun buildSinkFromRoute(element: JsonObject, ho):
ElasticsearchSink<JsonObject> {
        val builder = ElasticsearchSink.Builder<JsonObject>(
            buildHostsFromElement(element),
            ElasticsearchRoutingFunction()
        )

        builder.setBulkFlushMaxActions(1)

        // TODO: Configure authorization if available
//        builder.setRestClientFactory { restClient ->
//            restClient.setHttpClientConfigCallback(object :
RestClientBuilder.HttpClientConfigCallback {
//                override fun customizeHttpClient(builder:
HttpAsyncClientBuilder): HttpAsyncClientBuilder {
//                    // Configure authorization here
//                    val credentialsProvider =
BasicCredentialsProvider().apply {
//                        setCredentials(
//                            AuthScope.ANY,
//                            UsernamePasswordCredentials("$USERNAME",
"$PASSWORD")
//                        )
//                    }
//
//                    return
builder.setDefaultCredentialsProvider(credentialsProvider);
//                }
//            })
//        }

        return builder.build()
    }

    private fun buildHostsFromElement(element: JsonObject): List<HttpHost>{
        val transportAddresses = element
            .get("route").asJsonObject
            .get("hosts").asString

        // If there are multiple, they should be comma-delimited
        val addresses = transportAddresses.split(",")
        return addresses
            .filter { address -> address.isNotEmpty() }
            .map { address ->
                HttpHost.create(address)
            }
    }

    private fun getHost(element: JsonObject): String {
        return element
            .get("route").asJsonObject
            .get("hosts").asString
    }

    private class ElasticsearchRoutingFunction:
ElasticsearchSinkFunction<JsonObject> {
        override fun process(element: JsonObject, context:
RuntimeContext, indexer: RequestIndexer) {
            indexer.add(request(element))
        }

        private fun request(element: JsonObject): IndexRequest {
            // Access routing information
            val index = element
                .get("route").asJsonObject
                .get("index").asString

            // Strip off routing information
            element.remove("route")

            // Send the request
            return Requests.indexRequest()
                .index(index)
                .type("_doc")
                .source(mapOf(
                    "data" to "$element"
                ))
        }
    }
}

After running an integration test, I keep encountering running into the
following error during the invocation of the child sink:

// The runtime context has not been initialized.
sink.invoke(element, context)

I can see the underlying sink getting initialized, the open call being
made, etc. however for some reason it looks like there's an issue related
to the context during the invoke call namely* "The runtime context has not
been initialized". *I had assumed this would be alright since the context
for the "wrapper" should have already been initialized, but maybe there's
something that I'm missing.

Also, please forgive any hastily written or nasty code as this is purely a
POC to see if I could get this to work using a single object. I have the
hopes of cleaning it up and genericizing it after I am confident that it
actually works.

Thanks so much again,

Rion

On Mon, Aug 23, 2021 at 11:12 AM David Morávek <dm...@apache.org> wrote:

> Hi Rion,
>
> Sorry for late reply, I've missed your previous message. Thanks Arvid for
> the reminder <3.
>
> something like a MessageWrapper<ElementT, ConfigurationT> and pass those
>> elements to the sink, which would create the tenant-specific Elastic
>> connection from the ConfigurationT element and handle caching it and
>> then just grab the element and send it on it's way?
>
>
> Yes, this is exactly what I had in mind. There should be almost no
> overhead as sink can be easily chained with your join
> (KeyedCoProcessFunction) function.
>
>    -
>    -
>>
>>    The shape of the elements being evicted from the process function (Is
>>    a simple wrapper with the configuration for the sink enough here? Do I need
>>    to explicitly initialize the sink within this function? Etc.)
>
>    -
>    - To write an element you need a configuration for the destination and
>    the element itself, so a tuple of *(ElasticConfiguration, Element)*
>    should be enough (that's basically your MessageWrapper<ElementT,
>    ConfigurationT> class).
>
>
>    -
>    -
>>
>>    The actual use of the *DynamicElasticsearchSink* class (Would it just
>>    be something like an *.addSink(**DynamicElasticSearchSink<**String,
>>    Configuration>())* or perhaps something else entirely?)
>
>    -
>
> I guess it could look something like the snippet below. It would be
> definitely good to play around with the *DynamicElasticSearchSink* API
> and make it more meaningful / user friendly (the gist I've shared was just
> a very rough prototype to showcase the idea).
>
>
>    - static class Destination {
>
>        private final List<HttpHost> httpHosts;
>
>        Destination(List<HttpHost> httpHosts) {
>            this.httpHosts = httpHosts;
>        }
>    }
>    -
>    - final DataStream<Tuple2<Destination, String>> toWrite = ...;
>    toWrite.addSink(
>            new DynamicElasticsearchSink<>(
>                    new SinkRouter<
>                            Tuple2<Destination, String>,
>                            String,
>                            ElasticsearchSink<Tuple2<Destination,
>    String>>>() {
>
>                        @Override
>                        public String getRoute(Tuple2<Destination, String>
>    element) {
>    -                         // Construct a deterministic unique caching
>    key for the destination... (this could be cheaper if you know the data)
>                            return element.f0.httpHosts.stream()
>                                    .map(HttpHost::toHostString)
>                                    .collect(Collectors.joining(","));
>                        }
>
>                        @Override
>                        public ElasticsearchSink<Tuple2<Destination,
>    String>> createSink(
>                                String cacheKey, Tuple2<Destination,
>    String> element) {
>                            return new ElasticsearchSink.Builder<>(
>                                            element.f0.httpHosts,
>                                            (ElasticsearchSinkFunction<
>
>    Tuple2<Destination, String>>)
>                                                    (el, ctx, indexer) -> {
>                                                        // Construct index
>    request.
>                                                        final IndexRequest
>    request = ...;
>
>    indexer.add(request);
>                                                    })
>                                    .build();
>                        }
>                    }));
>
>
> I hope this helps ;)
>
> Best,
> D.
>
>
> On Mon, Aug 16, 2021 at 5:18 PM Rion Williams <ri...@gmail.com>
> wrote:
>
>> Thanks for this suggestion David, it's extremely helpful.
>>
>> Since this will vary depending on the elements retrieved from a separate
>> stream, I'm guessing something like the following would be roughly the
>> avenue to continue down:
>>
>> fun main(args: Array<String>) {
>>     val parameters = mergeParametersFromProperties(args)
>>     val stream = StreamExecutionEnvironment.getExecutionEnvironment()
>>
>>     // Get the stream for tenant-specific Elastic configurations
>>     val connectionStream = stream
>>         .fromSource(
>>             KafkaSource.of(parameters, listOf("elastic-configs")),
>>             WatermarkStrategy.noWatermarks(),
>>             "elastic-configs"
>>         )
>>
>>     // Get the stream of incoming messages to be routed to Elastic
>>     stream
>>         .fromSource(
>>             KafkaSource.of(parameters, listOf("messages")),
>>             WatermarkStrategy.noWatermarks(),
>>             "messages"
>>         )
>>         .keyBy { message ->
>>             // Key by the tenant in the message
>>             message.getTenant()
>>         }
>>         .connect(
>>             // Connect the messages stream with the configurations
>>             connectionStream
>>         )
>>         .process(object : KeyedCoProcessFunction<String, String, String, String>() {
>>             // For this key, we need to store all of the previous messages in state
>>             // in the case where we don't have a given mapping for this tenant yet
>>             lateinit var messagesAwaitingConfigState: ListState<String>
>>             lateinit var configState: ValueState<String>
>>
>>             override fun open(parameters: Configuration) {
>>                 super.open(parameters)
>>                 // Initialize the states
>>                 messagesAwaitingConfigState = runtimeContext.getListState(awaitingStateDesc)
>>                 configState = runtimeContext.getState(configStateDesc)
>>             }
>>
>>             // When an element is received
>>             override fun processElement1(message: String, context: Context, out: Collector<String>) {
>>                 // Check if we have a mapping
>>                 if (configState.value() == null){
>>                     // We don't have a mapping for this tenant, store messages until we get it
>>                     messagesAwaitingConfigState.add(message)
>>                 }
>>                 else {
>>                     // Output the record with some indicator of the route?
>>                     out.collect(message)
>>                 }
>>             }
>>
>>             override fun processElement2(config: String, context: Context, out: Collector<String>) {
>>                 // If this mapping is for this specific tenant, store it and flush the pending
>>                 // records in state
>>                 if (config.getTenant() == context.currentKey){
>>                     configState.update(config)
>>                     val messagesToFlush = messagesAwaitingConfigState.get()
>>                     messagesToFlush.forEach { message ->
>>                         out.collect(message)
>>                     }
>>                 }
>>             }
>>
>>             // State descriptors
>>             val awaitingStateDesc = ListStateDescriptor(
>>                 "messages-awaiting-config",
>>                 TypeInformation.of(String::class.java)
>>             )
>>
>>             val configStateDesc = ValueStateDescriptor(
>>                 "elastic-config",
>>                 TypeInformation.of(String::class.java)
>>             )
>>         })
>>
>>     stream.executeAsync("$APPLICATION_NAME-job")
>> }
>>
>> Basically, connect my tenant-specific configuration stream with my
>> incoming messages (keyed by tenant) and buffer them until I have a
>> corresponding configuration (to avoid race-conditions). However, I'm
>> guessing what will happen here is rather than directly outputting the
>> messages from this process function, I'd construct some type of wrapper
>> here with the necessary routing/configuration for the message (obtained via
>> the configuration stream) along with the element, which might be something
>> like a MessageWrapper<ElementT, ConfigurationT> and pass those elements
>> to the sink, which would create the tenant-specific Elastic connection from
>> the ConfigurationT element and handle caching it and then just grab the
>> element and send it on it's way?
>>
>> Those are really the only bits I'm stuck on at the moment:
>>
>>    1. The shape of the elements being evicted from the process function
>>    (Is a simple wrapper with the configuration for the sink enough here? Do I
>>    need to explicitly initialize the sink within this function? Etc.)
>>    2. The actual use of the DynamicElasticsearchSink class (Would it
>>    just be something like an .addSink(DynamicElasticSearchSink<String,
>>    Configuration>()) or perhaps something else entirely?)
>>
>> Thanks again so much for the advice thus far David, it's greatly
>> appreciated.
>>
>> Rion
>>
>> On Fri, Aug 13, 2021 at 9:04 AM David Morávek <dm...@apache.org> wrote:
>>
>>> To give you a better idea, in high-level I think could look something
>>> like this
>>> <https://gist.github.com/dmvk/3f8124d585cd33e52cacd4a38b80f8c8> [1].
>>>
>>> [1] https://gist.github.com/dmvk/3f8124d585cd33e52cacd4a38b80f8c8
>>>
>>> On Fri, Aug 13, 2021 at 2:57 PM Rion Williams <ri...@gmail.com>
>>> wrote:
>>>
>>>> Hi David,
>>>>
>>>> Thanks for your response! I think there are currently quite a few
>>>> unknowns in my end in terms of what a production loads look like but I
>>>> think the number of clusters shouldn’t be too large (and will either rarely
>>>> change or have new entries come in at runtime, but it needs to support
>>>> that).
>>>>
>>>> I think the dynamic approach might be a good route to explore with
>>>> actual changes to the Elasticsearch sink as a longer term option. I’m not
>>>> sure what the dynamic one would look like at the moment though, perhaps
>>>> that’s something you’d be able to advise on?
>>>>
>>>> Given that all the records are keyed for a given tenant and I would
>>>> have the mappings stored in state, is it possible that within the open()
>>>> function for this dynamic route to access the state and initialize the
>>>> client there? Or maybe there’s some other approach (such as grouping by
>>>> clusters and dynamically handling indices)?
>>>>
>>>> I’d be happy to give a shot at making the appropriate changes to the
>>>> sink as well, although I’m far from an Elastic expert. If you point me in
>>>> the right direction, I may be able to help out.
>>>>
>>>> Thanks much!
>>>>
>>>> Rion
>>>>
>>>> On Aug 13, 2021, at 6:52 AM, David Morávek <dm...@apache.org> wrote:
>>>>
>>>> 
>>>> Hi Rion,
>>>>
>>>> As you probably already know, for dynamic indices, you can simply
>>>> implement your own ElasticsearchSinkFunction
>>>> <https://github.com/apache/flink/blob/release-1.13.2/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkFunction.java>
>>>> [1], where you can create any request that elastic client supports.
>>>>
>>>> The tricky part is how to implement dynamic routing into multiple
>>>> clusters.
>>>> - If the elastic clusters are known upfront (before submitting job),
>>>> you can easily create multiple elastic sinks and prepend them with a simple
>>>> filter (this is basically what split operator does).
>>>> - If you discover elastics clusters at runtime, this would require some
>>>> changes of the current ElasticsearchSink implementation. I think this may
>>>> be actually as simple as introducing something like
>>>> DynamicElasticsearchSink, that could dynamically create and managed "child"
>>>> sinks. This would probably require some thoughts about how to manage
>>>> consumed resources (memory), because number of child sink could be
>>>> potentially unbounded. This could be of course simplified if underlying
>>>> elastic client already supports that, which I'm not aware of. If you'd like
>>>> to take this path, it would definitely be a great contribution to Flink
>>>> (I'm able to provide some guidance).
>>>>
>>>> [1]
>>>> https://github.com/apache/flink/blob/release-1.13.2/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkFunction.java
>>>>
>>>> Best,
>>>> D.
>>>>
>>>> On Sun, Aug 8, 2021 at 4:24 PM Rion Williams <ri...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi folks,
>>>>>
>>>>> I have a use-case that I wanted to initially pose to the mailing list
>>>>> as I’m not terribly familiar with the Elasticsearch connector to ensure I’m
>>>>> not going down the wrong path trying to accomplish this in Flink (or if
>>>>> something downstream might be a better option).
>>>>>
>>>>> Basically, I have the following pieces to the puzzle:
>>>>>
>>>>>    - A stream of tenant-specific events
>>>>>    - An HTTP endpoint containing mappings for tenant-specific Elastic
>>>>>    cluster information (as each tenant has its own specific Elastic
>>>>>    cluster/index)
>>>>>
>>>>> What I’m hoping to accomplish is the following:
>>>>>
>>>>>    1. One stream will periodically poll the HTTP endpoint and store
>>>>>    these cluster mappings in state (keyed by tenant with cluster info as the
>>>>>    value)
>>>>>    2. The event stream will be keyed by tenant and connected to the
>>>>>    cluster mappings stream.
>>>>>    3. I’ll need to an Elasticsearch sink that can route the
>>>>>    tenant-specific event data to its corresponding cluster/index from the
>>>>>    mapping source.
>>>>>
>>>>> I know that the existing Elasticsearch sink supports dynamic indices,
>>>>> however I didn’t know if it’s possible to adjust the cluster like I would
>>>>> need on a per-tenant basis or if there’s a better approach here?
>>>>>
>>>>> Any advice would be appreciated.
>>>>>
>>>>> Thanks,
>>>>>
>>>>> Rion
>>>>>
>>>>

Re: Dynamic Cluster/Index Routing for Elasticsearch Sink

Posted by David Morávek <dm...@apache.org>.
Hi Rion,

Sorry for late reply, I've missed your previous message. Thanks Arvid for
the reminder <3.

something like a MessageWrapper<ElementT, ConfigurationT> and pass those
> elements to the sink, which would create the tenant-specific Elastic
> connection from the ConfigurationT element and handle caching it and then
> just grab the element and send it on it's way?


Yes, this is exactly what I had in mind. There should be almost no overhead
as sink can be easily chained with your join (KeyedCoProcessFunction)
function.
-
-
>
> The shape of the elements being evicted from the process function (Is a
> simple wrapper with the configuration for the sink enough here? Do I need
> to explicitly initialize the sink within this function? Etc.)

-
- To write an element you need a configuration for the destination and the
element itself, so a tuple of *(ElasticConfiguration, Element)* should be
enough (that's basically your MessageWrapper<ElementT, ConfigurationT>
class).
-
-
>
> The actual use of the *DynamicElasticsearchSink* class (Would it just be
> something like an *.addSink(**DynamicElasticSearchSink<**String,
> Configuration>())* or perhaps something else entirely?)

-
I guess it could look something like the snippet below. It would be
definitely good to play around with the *DynamicElasticSearchSink* API and
make it more meaningful / user friendly (the gist I've shared was just a
very rough prototype to showcase the idea).

- static class Destination {

    private final List<HttpHost> httpHosts;

    Destination(List<HttpHost> httpHosts) {
        this.httpHosts = httpHosts;
    }
}
-
- final DataStream<Tuple2<Destination, String>> toWrite = ...;
toWrite.addSink(
        new DynamicElasticsearchSink<>(
                new SinkRouter<
                        Tuple2<Destination, String>,
                        String,
                        ElasticsearchSink<Tuple2<Destination, String>>>() {

                    @Override
                    public String getRoute(Tuple2<Destination, String>
element) {
-                         // Construct a deterministic unique caching key
for the destination... (this could be cheaper if you know the data)
                        return element.f0.httpHosts.stream()
                                .map(HttpHost::toHostString)
                                .collect(Collectors.joining(","));
                    }

                    @Override
                    public ElasticsearchSink<Tuple2<Destination, String>>
createSink(
                            String cacheKey, Tuple2<Destination, String>
element) {
                        return new ElasticsearchSink.Builder<>(
                                        element.f0.httpHosts,
                                        (ElasticsearchSinkFunction<
                                                        Tuple2<Destination,
String>>)
                                                (el, ctx, indexer) -> {
                                                    // Construct index
request.
                                                    final IndexRequest
request = ...;
                                                    indexer.add(request);
                                                })
                                .build();
                    }
                }));

I hope this helps ;)

Best,
D.


On Mon, Aug 16, 2021 at 5:18 PM Rion Williams <ri...@gmail.com> wrote:

> Thanks for this suggestion David, it's extremely helpful.
>
> Since this will vary depending on the elements retrieved from a separate
> stream, I'm guessing something like the following would be roughly the
> avenue to continue down:
>
> fun main(args: Array<String>) {
>     val parameters = mergeParametersFromProperties(args)
>     val stream = StreamExecutionEnvironment.getExecutionEnvironment()
>
>     // Get the stream for tenant-specific Elastic configurations
>     val connectionStream = stream
>         .fromSource(
>             KafkaSource.of(parameters, listOf("elastic-configs")),
>             WatermarkStrategy.noWatermarks(),
>             "elastic-configs"
>         )
>
>     // Get the stream of incoming messages to be routed to Elastic
>     stream
>         .fromSource(
>             KafkaSource.of(parameters, listOf("messages")),
>             WatermarkStrategy.noWatermarks(),
>             "messages"
>         )
>         .keyBy { message ->
>             // Key by the tenant in the message
>             message.getTenant()
>         }
>         .connect(
>             // Connect the messages stream with the configurations
>             connectionStream
>         )
>         .process(object : KeyedCoProcessFunction<String, String, String, String>() {
>             // For this key, we need to store all of the previous messages in state
>             // in the case where we don't have a given mapping for this tenant yet
>             lateinit var messagesAwaitingConfigState: ListState<String>
>             lateinit var configState: ValueState<String>
>
>             override fun open(parameters: Configuration) {
>                 super.open(parameters)
>                 // Initialize the states
>                 messagesAwaitingConfigState = runtimeContext.getListState(awaitingStateDesc)
>                 configState = runtimeContext.getState(configStateDesc)
>             }
>
>             // When an element is received
>             override fun processElement1(message: String, context: Context, out: Collector<String>) {
>                 // Check if we have a mapping
>                 if (configState.value() == null){
>                     // We don't have a mapping for this tenant, store messages until we get it
>                     messagesAwaitingConfigState.add(message)
>                 }
>                 else {
>                     // Output the record with some indicator of the route?
>                     out.collect(message)
>                 }
>             }
>
>             override fun processElement2(config: String, context: Context, out: Collector<String>) {
>                 // If this mapping is for this specific tenant, store it and flush the pending
>                 // records in state
>                 if (config.getTenant() == context.currentKey){
>                     configState.update(config)
>                     val messagesToFlush = messagesAwaitingConfigState.get()
>                     messagesToFlush.forEach { message ->
>                         out.collect(message)
>                     }
>                 }
>             }
>
>             // State descriptors
>             val awaitingStateDesc = ListStateDescriptor(
>                 "messages-awaiting-config",
>                 TypeInformation.of(String::class.java)
>             )
>
>             val configStateDesc = ValueStateDescriptor(
>                 "elastic-config",
>                 TypeInformation.of(String::class.java)
>             )
>         })
>
>     stream.executeAsync("$APPLICATION_NAME-job")
> }
>
> Basically, connect my tenant-specific configuration stream with my
> incoming messages (keyed by tenant) and buffer them until I have a
> corresponding configuration (to avoid race-conditions). However, I'm
> guessing what will happen here is rather than directly outputting the
> messages from this process function, I'd construct some type of wrapper
> here with the necessary routing/configuration for the message (obtained via
> the configuration stream) along with the element, which might be something
> like a MessageWrapper<ElementT, ConfigurationT> and pass those elements
> to the sink, which would create the tenant-specific Elastic connection from
> the ConfigurationT element and handle caching it and then just grab the
> element and send it on it's way?
>
> Those are really the only bits I'm stuck on at the moment:
>
>    1. The shape of the elements being evicted from the process function
>    (Is a simple wrapper with the configuration for the sink enough here? Do I
>    need to explicitly initialize the sink within this function? Etc.)
>    2. The actual use of the DynamicElasticsearchSink class (Would it just
>    be something like an .addSink(DynamicElasticSearchSink<String,
>    Configuration>()) or perhaps something else entirely?)
>
> Thanks again so much for the advice thus far David, it's greatly
> appreciated.
>
> Rion
>
> On Fri, Aug 13, 2021 at 9:04 AM David Morávek <dm...@apache.org> wrote:
>
>> To give you a better idea, in high-level I think could look something
>> like this <https://gist.github.com/dmvk/3f8124d585cd33e52cacd4a38b80f8c8>
>> [1].
>>
>> [1] https://gist.github.com/dmvk/3f8124d585cd33e52cacd4a38b80f8c8
>>
>> On Fri, Aug 13, 2021 at 2:57 PM Rion Williams <ri...@gmail.com>
>> wrote:
>>
>>> Hi David,
>>>
>>> Thanks for your response! I think there are currently quite a few
>>> unknowns in my end in terms of what a production loads look like but I
>>> think the number of clusters shouldn’t be too large (and will either rarely
>>> change or have new entries come in at runtime, but it needs to support
>>> that).
>>>
>>> I think the dynamic approach might be a good route to explore with
>>> actual changes to the Elasticsearch sink as a longer term option. I’m not
>>> sure what the dynamic one would look like at the moment though, perhaps
>>> that’s something you’d be able to advise on?
>>>
>>> Given that all the records are keyed for a given tenant and I would have
>>> the mappings stored in state, is it possible that within the open()
>>> function for this dynamic route to access the state and initialize the
>>> client there? Or maybe there’s some other approach (such as grouping by
>>> clusters and dynamically handling indices)?
>>>
>>> I’d be happy to give a shot at making the appropriate changes to the
>>> sink as well, although I’m far from an Elastic expert. If you point me in
>>> the right direction, I may be able to help out.
>>>
>>> Thanks much!
>>>
>>> Rion
>>>
>>> On Aug 13, 2021, at 6:52 AM, David Morávek <dm...@apache.org> wrote:
>>>
>>> 
>>> Hi Rion,
>>>
>>> As you probably already know, for dynamic indices, you can simply
>>> implement your own ElasticsearchSinkFunction
>>> <https://github.com/apache/flink/blob/release-1.13.2/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkFunction.java>
>>> [1], where you can create any request that elastic client supports.
>>>
>>> The tricky part is how to implement dynamic routing into multiple
>>> clusters.
>>> - If the elastic clusters are known upfront (before submitting job), you
>>> can easily create multiple elastic sinks and prepend them with a simple
>>> filter (this is basically what split operator does).
>>> - If you discover elastics clusters at runtime, this would require some
>>> changes of the current ElasticsearchSink implementation. I think this may
>>> be actually as simple as introducing something like
>>> DynamicElasticsearchSink, that could dynamically create and managed "child"
>>> sinks. This would probably require some thoughts about how to manage
>>> consumed resources (memory), because number of child sink could be
>>> potentially unbounded. This could be of course simplified if underlying
>>> elastic client already supports that, which I'm not aware of. If you'd like
>>> to take this path, it would definitely be a great contribution to Flink
>>> (I'm able to provide some guidance).
>>>
>>> [1]
>>> https://github.com/apache/flink/blob/release-1.13.2/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkFunction.java
>>>
>>> Best,
>>> D.
>>>
>>> On Sun, Aug 8, 2021 at 4:24 PM Rion Williams <ri...@gmail.com>
>>> wrote:
>>>
>>>> Hi folks,
>>>>
>>>> I have a use-case that I wanted to initially pose to the mailing list
>>>> as I’m not terribly familiar with the Elasticsearch connector to ensure I’m
>>>> not going down the wrong path trying to accomplish this in Flink (or if
>>>> something downstream might be a better option).
>>>>
>>>> Basically, I have the following pieces to the puzzle:
>>>>
>>>>    - A stream of tenant-specific events
>>>>    - An HTTP endpoint containing mappings for tenant-specific Elastic
>>>>    cluster information (as each tenant has its own specific Elastic
>>>>    cluster/index)
>>>>
>>>> What I’m hoping to accomplish is the following:
>>>>
>>>>    1. One stream will periodically poll the HTTP endpoint and store
>>>>    these cluster mappings in state (keyed by tenant with cluster info as the
>>>>    value)
>>>>    2. The event stream will be keyed by tenant and connected to the
>>>>    cluster mappings stream.
>>>>    3. I’ll need to an Elasticsearch sink that can route the
>>>>    tenant-specific event data to its corresponding cluster/index from the
>>>>    mapping source.
>>>>
>>>> I know that the existing Elasticsearch sink supports dynamic indices,
>>>> however I didn’t know if it’s possible to adjust the cluster like I would
>>>> need on a per-tenant basis or if there’s a better approach here?
>>>>
>>>> Any advice would be appreciated.
>>>>
>>>> Thanks,
>>>>
>>>> Rion
>>>>
>>>

Re: Dynamic Cluster/Index Routing for Elasticsearch Sink

Posted by Rion Williams <ri...@gmail.com>.
Thanks for this suggestion David, it's extremely helpful.

Since this will vary depending on the elements retrieved from a separate
stream, I'm guessing something like the following would be roughly the
avenue to continue down:

fun main(args: Array<String>) {
    val parameters = mergeParametersFromProperties(args)
    val stream = StreamExecutionEnvironment.getExecutionEnvironment()

    // Get the stream for tenant-specific Elastic configurations
    val connectionStream = stream
        .fromSource(
            KafkaSource.of(parameters, listOf("elastic-configs")),
            WatermarkStrategy.noWatermarks(),
            "elastic-configs"
        )

    // Get the stream of incoming messages to be routed to Elastic
    stream
        .fromSource(
            KafkaSource.of(parameters, listOf("messages")),
            WatermarkStrategy.noWatermarks(),
            "messages"
        )
        .keyBy { message ->
            // Key by the tenant in the message
            message.getTenant()
        }
        .connect(
            // Connect the messages stream with the configurations
            connectionStream
        )
        .process(object : KeyedCoProcessFunction<String, String,
String, String>() {
            // For this key, we need to store all of the previous
messages in state
            // in the case where we don't have a given mapping for
this tenant yet
            lateinit var messagesAwaitingConfigState: ListState<String>
            lateinit var configState: ValueState<String>

            override fun open(parameters: Configuration) {
                super.open(parameters)
                // Initialize the states
                messagesAwaitingConfigState =
runtimeContext.getListState(awaitingStateDesc)
                configState = runtimeContext.getState(configStateDesc)
            }

            // When an element is received
            override fun processElement1(message: String, context:
Context, out: Collector<String>) {
                // Check if we have a mapping
                if (configState.value() == null){
                    // We don't have a mapping for this tenant, store
messages until we get it
                    messagesAwaitingConfigState.add(message)
                }
                else {
                    // Output the record with some indicator of the route?
                    out.collect(message)
                }
            }

            override fun processElement2(config: String, context:
Context, out: Collector<String>) {
                // If this mapping is for this specific tenant, store
it and flush the pending
                // records in state
                if (config.getTenant() == context.currentKey){
                    configState.update(config)
                    val messagesToFlush = messagesAwaitingConfigState.get()
                    messagesToFlush.forEach { message ->
                        out.collect(message)
                    }
                }
            }

            // State descriptors
            val awaitingStateDesc = ListStateDescriptor(
                "messages-awaiting-config",
                TypeInformation.of(String::class.java)
            )

            val configStateDesc = ValueStateDescriptor(
                "elastic-config",
                TypeInformation.of(String::class.java)
            )
        })

    stream.executeAsync("$APPLICATION_NAME-job")
}

Basically, connect my tenant-specific configuration stream with my incoming
messages (keyed by tenant) and buffer them until I have a corresponding
configuration (to avoid race-conditions). However, I'm guessing what will
happen here is rather than directly outputting the messages from this
process function, I'd construct some type of wrapper here with the
necessary routing/configuration for the message (obtained via the
configuration stream) along with the element, which might be something like
a MessageWrapper<ElementT, ConfigurationT> and pass those elements to the
sink, which would create the tenant-specific Elastic connection from the
ConfigurationT element and handle caching it and then just grab the element
and send it on it's way?

Those are really the only bits I'm stuck on at the moment:

   1. The shape of the elements being evicted from the process function (Is
   a simple wrapper with the configuration for the sink enough here? Do I need
   to explicitly initialize the sink within this function? Etc.)
   2. The actual use of the DynamicElasticsearchSink class (Would it just
   be something like an .addSink(DynamicElasticSearchSink<String,
   Configuration>()) or perhaps something else entirely?)

Thanks again so much for the advice thus far David, it's greatly
appreciated.

Rion

On Fri, Aug 13, 2021 at 9:04 AM David Morávek <dm...@apache.org> wrote:

> To give you a better idea, in high-level I think could look something like
> this <https://gist.github.com/dmvk/3f8124d585cd33e52cacd4a38b80f8c8> [1].
>
> [1] https://gist.github.com/dmvk/3f8124d585cd33e52cacd4a38b80f8c8
>
> On Fri, Aug 13, 2021 at 2:57 PM Rion Williams <ri...@gmail.com>
> wrote:
>
>> Hi David,
>>
>> Thanks for your response! I think there are currently quite a few
>> unknowns in my end in terms of what a production loads look like but I
>> think the number of clusters shouldn’t be too large (and will either rarely
>> change or have new entries come in at runtime, but it needs to support
>> that).
>>
>> I think the dynamic approach might be a good route to explore with actual
>> changes to the Elasticsearch sink as a longer term option. I’m not sure
>> what the dynamic one would look like at the moment though, perhaps that’s
>> something you’d be able to advise on?
>>
>> Given that all the records are keyed for a given tenant and I would have
>> the mappings stored in state, is it possible that within the open()
>> function for this dynamic route to access the state and initialize the
>> client there? Or maybe there’s some other approach (such as grouping by
>> clusters and dynamically handling indices)?
>>
>> I’d be happy to give a shot at making the appropriate changes to the sink
>> as well, although I’m far from an Elastic expert. If you point me in the
>> right direction, I may be able to help out.
>>
>> Thanks much!
>>
>> Rion
>>
>> On Aug 13, 2021, at 6:52 AM, David Morávek <dm...@apache.org> wrote:
>>
>> 
>> Hi Rion,
>>
>> As you probably already know, for dynamic indices, you can simply
>> implement your own ElasticsearchSinkFunction
>> <https://github.com/apache/flink/blob/release-1.13.2/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkFunction.java>
>> [1], where you can create any request that elastic client supports.
>>
>> The tricky part is how to implement dynamic routing into multiple
>> clusters.
>> - If the elastic clusters are known upfront (before submitting job), you
>> can easily create multiple elastic sinks and prepend them with a simple
>> filter (this is basically what split operator does).
>> - If you discover elastics clusters at runtime, this would require some
>> changes of the current ElasticsearchSink implementation. I think this may
>> be actually as simple as introducing something like
>> DynamicElasticsearchSink, that could dynamically create and managed "child"
>> sinks. This would probably require some thoughts about how to manage
>> consumed resources (memory), because number of child sink could be
>> potentially unbounded. This could be of course simplified if underlying
>> elastic client already supports that, which I'm not aware of. If you'd like
>> to take this path, it would definitely be a great contribution to Flink
>> (I'm able to provide some guidance).
>>
>> [1]
>> https://github.com/apache/flink/blob/release-1.13.2/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkFunction.java
>>
>> Best,
>> D.
>>
>> On Sun, Aug 8, 2021 at 4:24 PM Rion Williams <ri...@gmail.com>
>> wrote:
>>
>>> Hi folks,
>>>
>>> I have a use-case that I wanted to initially pose to the mailing list as
>>> I’m not terribly familiar with the Elasticsearch connector to ensure I’m
>>> not going down the wrong path trying to accomplish this in Flink (or if
>>> something downstream might be a better option).
>>>
>>> Basically, I have the following pieces to the puzzle:
>>>
>>>    - A stream of tenant-specific events
>>>    - An HTTP endpoint containing mappings for tenant-specific Elastic
>>>    cluster information (as each tenant has its own specific Elastic
>>>    cluster/index)
>>>
>>> What I’m hoping to accomplish is the following:
>>>
>>>    1. One stream will periodically poll the HTTP endpoint and store
>>>    these cluster mappings in state (keyed by tenant with cluster info as the
>>>    value)
>>>    2. The event stream will be keyed by tenant and connected to the
>>>    cluster mappings stream.
>>>    3. I’ll need to an Elasticsearch sink that can route the
>>>    tenant-specific event data to its corresponding cluster/index from the
>>>    mapping source.
>>>
>>> I know that the existing Elasticsearch sink supports dynamic indices,
>>> however I didn’t know if it’s possible to adjust the cluster like I would
>>> need on a per-tenant basis or if there’s a better approach here?
>>>
>>> Any advice would be appreciated.
>>>
>>> Thanks,
>>>
>>> Rion
>>>
>>

Re: Dynamic Cluster/Index Routing for Elasticsearch Sink

Posted by David Morávek <dm...@apache.org>.
To give you a better idea, in high-level I think could look something like
this <https://gist.github.com/dmvk/3f8124d585cd33e52cacd4a38b80f8c8> [1].

[1] https://gist.github.com/dmvk/3f8124d585cd33e52cacd4a38b80f8c8

On Fri, Aug 13, 2021 at 2:57 PM Rion Williams <ri...@gmail.com> wrote:

> Hi David,
>
> Thanks for your response! I think there are currently quite a few unknowns
> in my end in terms of what a production loads look like but I think the
> number of clusters shouldn’t be too large (and will either rarely change or
> have new entries come in at runtime, but it needs to support that).
>
> I think the dynamic approach might be a good route to explore with actual
> changes to the Elasticsearch sink as a longer term option. I’m not sure
> what the dynamic one would look like at the moment though, perhaps that’s
> something you’d be able to advise on?
>
> Given that all the records are keyed for a given tenant and I would have
> the mappings stored in state, is it possible that within the open()
> function for this dynamic route to access the state and initialize the
> client there? Or maybe there’s some other approach (such as grouping by
> clusters and dynamically handling indices)?
>
> I’d be happy to give a shot at making the appropriate changes to the sink
> as well, although I’m far from an Elastic expert. If you point me in the
> right direction, I may be able to help out.
>
> Thanks much!
>
> Rion
>
> On Aug 13, 2021, at 6:52 AM, David Morávek <dm...@apache.org> wrote:
>
> 
> Hi Rion,
>
> As you probably already know, for dynamic indices, you can simply
> implement your own ElasticsearchSinkFunction
> <https://github.com/apache/flink/blob/release-1.13.2/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkFunction.java>
> [1], where you can create any request that elastic client supports.
>
> The tricky part is how to implement dynamic routing into multiple
> clusters.
> - If the elastic clusters are known upfront (before submitting job), you
> can easily create multiple elastic sinks and prepend them with a simple
> filter (this is basically what split operator does).
> - If you discover elastics clusters at runtime, this would require some
> changes of the current ElasticsearchSink implementation. I think this may
> be actually as simple as introducing something like
> DynamicElasticsearchSink, that could dynamically create and managed "child"
> sinks. This would probably require some thoughts about how to manage
> consumed resources (memory), because number of child sink could be
> potentially unbounded. This could be of course simplified if underlying
> elastic client already supports that, which I'm not aware of. If you'd like
> to take this path, it would definitely be a great contribution to Flink
> (I'm able to provide some guidance).
>
> [1]
> https://github.com/apache/flink/blob/release-1.13.2/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkFunction.java
>
> Best,
> D.
>
> On Sun, Aug 8, 2021 at 4:24 PM Rion Williams <ri...@gmail.com>
> wrote:
>
>> Hi folks,
>>
>> I have a use-case that I wanted to initially pose to the mailing list as
>> I’m not terribly familiar with the Elasticsearch connector to ensure I’m
>> not going down the wrong path trying to accomplish this in Flink (or if
>> something downstream might be a better option).
>>
>> Basically, I have the following pieces to the puzzle:
>>
>>    - A stream of tenant-specific events
>>    - An HTTP endpoint containing mappings for tenant-specific Elastic
>>    cluster information (as each tenant has its own specific Elastic
>>    cluster/index)
>>
>> What I’m hoping to accomplish is the following:
>>
>>    1. One stream will periodically poll the HTTP endpoint and store
>>    these cluster mappings in state (keyed by tenant with cluster info as the
>>    value)
>>    2. The event stream will be keyed by tenant and connected to the
>>    cluster mappings stream.
>>    3. I’ll need to an Elasticsearch sink that can route the
>>    tenant-specific event data to its corresponding cluster/index from the
>>    mapping source.
>>
>> I know that the existing Elasticsearch sink supports dynamic indices,
>> however I didn’t know if it’s possible to adjust the cluster like I would
>> need on a per-tenant basis or if there’s a better approach here?
>>
>> Any advice would be appreciated.
>>
>> Thanks,
>>
>> Rion
>>
>

Re: Dynamic Cluster/Index Routing for Elasticsearch Sink

Posted by Rion Williams <ri...@gmail.com>.
Hi David,

Thanks for your response! I think there are currently quite a few unknowns in my end in terms of what a production loads look like but I think the number of clusters shouldn’t be too large (and will either rarely change or have new entries come in at runtime, but it needs to support that).

I think the dynamic approach might be a good route to explore with actual changes to the Elasticsearch sink as a longer term option. I’m not sure what the dynamic one would look like at the moment though, perhaps that’s something you’d be able to advise on?

Given that all the records are keyed for a given tenant and I would have the mappings stored in state, is it possible that within the open() function for this dynamic route to access the state and initialize the client there? Or maybe there’s some other approach (such as grouping by clusters and dynamically handling indices)?

I’d be happy to give a shot at making the appropriate changes to the sink as well, although I’m far from an Elastic expert. If you point me in the right direction, I may be able to help out.

Thanks much!

Rion

> On Aug 13, 2021, at 6:52 AM, David Morávek <dm...@apache.org> wrote:
> 
> 
> Hi Rion,
> 
> As you probably already know, for dynamic indices, you can simply implement your own ElasticsearchSinkFunction [1], where you can create any request that elastic client supports.
> 
> The tricky part is how to implement dynamic routing into multiple clusters. 
> - If the elastic clusters are known upfront (before submitting job), you can easily create multiple elastic sinks and prepend them with a simple filter (this is basically what split operator does).
> - If you discover elastics clusters at runtime, this would require some changes of the current ElasticsearchSink implementation. I think this may be actually as simple as introducing something like DynamicElasticsearchSink, that could dynamically create and managed "child" sinks. This would probably require some thoughts about how to manage consumed resources (memory), because number of child sink could be potentially unbounded. This could be of course simplified if underlying elastic client already supports that, which I'm not aware of. If you'd like to take this path, it would definitely be a great contribution to Flink (I'm able to provide some guidance). 
> 
> [1] https://github.com/apache/flink/blob/release-1.13.2/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkFunction.java
> 
> Best,
> D.
> 
>> On Sun, Aug 8, 2021 at 4:24 PM Rion Williams <ri...@gmail.com> wrote:
>> Hi folks,
>> 
>> I have a use-case that I wanted to initially pose to the mailing list as I’m not terribly familiar with the Elasticsearch connector to ensure I’m not going down the wrong path trying to accomplish this in Flink (or if something downstream might be a better option).
>> 
>> Basically, I have the following pieces to the puzzle:
>> A stream of tenant-specific events
>> An HTTP endpoint containing mappings for tenant-specific Elastic cluster information (as each tenant has its own specific Elastic cluster/index)
>> What I’m hoping to accomplish is the following:
>> One stream will periodically poll the HTTP endpoint and store these cluster mappings in state (keyed by tenant with cluster info as the value)
>> The event stream will be keyed by tenant and connected to the cluster mappings stream.
>> I’ll need to an Elasticsearch sink that can route the tenant-specific event data to its corresponding cluster/index from the mapping source.
>> I know that the existing Elasticsearch sink supports dynamic indices, however I didn’t know if it’s possible to adjust the cluster like I would need on a per-tenant basis or if there’s a better approach here? 
>> 
>> Any advice would be appreciated.
>> 
>> Thanks,
>> 
>> Rion

Re: Dynamic Cluster/Index Routing for Elasticsearch Sink

Posted by David Morávek <dm...@apache.org>.
Hi Rion,

As you probably already know, for dynamic indices, you can simply implement
your own ElasticsearchSinkFunction
<https://github.com/apache/flink/blob/release-1.13.2/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkFunction.java>
[1], where you can create any request that elastic client supports.

The tricky part is how to implement dynamic routing into multiple clusters.
- If the elastic clusters are known upfront (before submitting job), you
can easily create multiple elastic sinks and prepend them with a simple
filter (this is basically what split operator does).
- If you discover elastics clusters at runtime, this would require some
changes of the current ElasticsearchSink implementation. I think this may
be actually as simple as introducing something like
DynamicElasticsearchSink, that could dynamically create and managed "child"
sinks. This would probably require some thoughts about how to manage
consumed resources (memory), because number of child sink could be
potentially unbounded. This could be of course simplified if underlying
elastic client already supports that, which I'm not aware of. If you'd like
to take this path, it would definitely be a great contribution to Flink
(I'm able to provide some guidance).

[1]
https://github.com/apache/flink/blob/release-1.13.2/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkFunction.java

Best,
D.

On Sun, Aug 8, 2021 at 4:24 PM Rion Williams <ri...@gmail.com> wrote:

> Hi folks,
>
> I have a use-case that I wanted to initially pose to the mailing list as
> I’m not terribly familiar with the Elasticsearch connector to ensure I’m
> not going down the wrong path trying to accomplish this in Flink (or if
> something downstream might be a better option).
>
> Basically, I have the following pieces to the puzzle:
>
>    - A stream of tenant-specific events
>    - An HTTP endpoint containing mappings for tenant-specific Elastic
>    cluster information (as each tenant has its own specific Elastic
>    cluster/index)
>
> What I’m hoping to accomplish is the following:
>
>    1. One stream will periodically poll the HTTP endpoint and store these
>    cluster mappings in state (keyed by tenant with cluster info as the value)
>    2. The event stream will be keyed by tenant and connected to the
>    cluster mappings stream.
>    3. I’ll need to an Elasticsearch sink that can route the
>    tenant-specific event data to its corresponding cluster/index from the
>    mapping source.
>
> I know that the existing Elasticsearch sink supports dynamic indices,
> however I didn’t know if it’s possible to adjust the cluster like I would
> need on a per-tenant basis or if there’s a better approach here?
>
> Any advice would be appreciated.
>
> Thanks,
>
> Rion
>