You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ph...@apache.org on 2018/06/06 14:15:01 UTC
[33/51] [partial] nifi-minifi-cpp git commit: MINIFICPP-512 - upgrade
to librdkafka 0.11.4
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.1/src/rdkafka_assignor.c
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdkafka_assignor.c b/thirdparty/librdkafka-0.11.1/src/rdkafka_assignor.c
deleted file mode 100644
index 45946db..0000000
--- a/thirdparty/librdkafka-0.11.1/src/rdkafka_assignor.c
+++ /dev/null
@@ -1,551 +0,0 @@
-/*
- * librdkafka - The Apache Kafka C/C++ library
- *
- * Copyright (c) 2015 Magnus Edenhill
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- *
- * 1. Redistributions of source code must retain the above copyright notice,
- * this list of conditions and the following disclaimer.
- * 2. Redistributions in binary form must reproduce the above copyright notice,
- * this list of conditions and the following disclaimer in the documentation
- * and/or other materials provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- */
-#include "rdkafka_int.h"
-#include "rdkafka_assignor.h"
-
-#include <ctype.h>
-
-/**
- * Clear out and free any memory used by the member, but not the rkgm itself.
- */
-void rd_kafka_group_member_clear (rd_kafka_group_member_t *rkgm) {
- if (rkgm->rkgm_subscription)
- rd_kafka_topic_partition_list_destroy(rkgm->rkgm_subscription);
-
- if (rkgm->rkgm_assignment)
- rd_kafka_topic_partition_list_destroy(rkgm->rkgm_assignment);
-
- rd_list_destroy(&rkgm->rkgm_eligible);
-
- if (rkgm->rkgm_member_id)
- rd_kafkap_str_destroy(rkgm->rkgm_member_id);
-
- if (rkgm->rkgm_userdata)
- rd_kafkap_bytes_destroy(rkgm->rkgm_userdata);
-
- if (rkgm->rkgm_member_metadata)
- rd_kafkap_bytes_destroy(rkgm->rkgm_member_metadata);
-
- memset(rkgm, 0, sizeof(*rkgm));
-}
-
-
-/**
- * Member id string comparator (takes rd_kafka_group_member_t *)
- */
-int rd_kafka_group_member_cmp (const void *_a, const void *_b) {
- const rd_kafka_group_member_t *a =
- (const rd_kafka_group_member_t *)_a;
- const rd_kafka_group_member_t *b =
- (const rd_kafka_group_member_t *)_b;
-
- return rd_kafkap_str_cmp(a->rkgm_member_id, b->rkgm_member_id);
-}
-
-
-/**
- * Returns true if member subscribes to topic, else false.
- */
-int
-rd_kafka_group_member_find_subscription (rd_kafka_t *rk,
- const rd_kafka_group_member_t *rkgm,
- const char *topic) {
- int i;
-
- /* Match against member's subscription. */
- for (i = 0 ; i < rkgm->rkgm_subscription->cnt ; i++) {
- const rd_kafka_topic_partition_t *rktpar =
- &rkgm->rkgm_subscription->elems[i];
-
- if (rd_kafka_topic_partition_match(rk, rkgm, rktpar,
- topic, NULL))
- return 1;
- }
-
- return 0;
-}
-
-
-
-static rd_kafkap_bytes_t *
-rd_kafka_consumer_protocol_member_metadata_new (
- const rd_list_t *topics,
- const void *userdata, size_t userdata_size) {
- rd_kafka_buf_t *rkbuf;
- rd_kafkap_bytes_t *kbytes;
- int i;
- int topic_cnt = rd_list_cnt(topics);
- const rd_kafka_topic_info_t *tinfo;
- size_t len;
-
- /*
- * MemberMetadata => Version Subscription AssignmentStrategies
- * Version => int16
- * Subscription => Topics UserData
- * Topics => [String]
- * UserData => Bytes
- */
-
- rkbuf = rd_kafka_buf_new(1, 100 + (topic_cnt * 100) + userdata_size);
-
- rd_kafka_buf_write_i16(rkbuf, 0);
- rd_kafka_buf_write_i32(rkbuf, topic_cnt);
- RD_LIST_FOREACH(tinfo, topics, i)
- rd_kafka_buf_write_str(rkbuf, tinfo->topic, -1);
- if (userdata)
- rd_kafka_buf_write_bytes(rkbuf, userdata, userdata_size);
- else /* Kafka 0.9.0.0 cant parse NULL bytes, so we provide empty. */
- rd_kafka_buf_write_bytes(rkbuf, "", 0);
-
- /* Get binary buffer and allocate a new Kafka Bytes with a copy. */
- rd_slice_init_full(&rkbuf->rkbuf_reader, &rkbuf->rkbuf_buf);
- len = rd_slice_remains(&rkbuf->rkbuf_reader);
- kbytes = rd_kafkap_bytes_new(NULL, (int32_t)len);
- rd_slice_read(&rkbuf->rkbuf_reader, (void *)kbytes->data, len);
- rd_kafka_buf_destroy(rkbuf);
-
- return kbytes;
-
-}
-
-
-
-
-rd_kafkap_bytes_t *
-rd_kafka_assignor_get_metadata (rd_kafka_assignor_t *rkas,
- const rd_list_t *topics) {
- return rd_kafka_consumer_protocol_member_metadata_new(
- topics, rkas->rkas_userdata,
- rkas->rkas_userdata_size);
-}
-
-
-
-
-
-/**
- * Returns 1 if all subscriptions are satifised for this member, else 0.
- */
-static int rd_kafka_member_subscription_match (
- rd_kafka_cgrp_t *rkcg,
- rd_kafka_group_member_t *rkgm,
- const rd_kafka_metadata_topic_t *topic_metadata,
- rd_kafka_assignor_topic_t *eligible_topic) {
- int i;
- int has_regex = 0;
- int matched = 0;
-
- /* Match against member's subscription. */
- for (i = 0 ; i < rkgm->rkgm_subscription->cnt ; i++) {
- const rd_kafka_topic_partition_t *rktpar =
- &rkgm->rkgm_subscription->elems[i];
- int matched_by_regex = 0;
-
- if (rd_kafka_topic_partition_match(rkcg->rkcg_rk, rkgm, rktpar,
- topic_metadata->topic,
- &matched_by_regex)) {
- rd_list_add(&rkgm->rkgm_eligible,
- (void *)topic_metadata);
- matched++;
- has_regex += matched_by_regex;
- }
- }
-
- if (matched)
- rd_list_add(&eligible_topic->members, rkgm);
-
- if (!has_regex &&
- rd_list_cnt(&rkgm->rkgm_eligible) == rkgm->rkgm_subscription->cnt)
- return 1; /* All subscriptions matched */
- else
- return 0;
-}
-
-
-static void
-rd_kafka_assignor_topic_destroy (rd_kafka_assignor_topic_t *at) {
- rd_list_destroy(&at->members);
- rd_free(at);
-}
-
-int rd_kafka_assignor_topic_cmp (const void *_a, const void *_b) {
- const rd_kafka_assignor_topic_t *a =
- *(const rd_kafka_assignor_topic_t * const *)_a;
- const rd_kafka_assignor_topic_t *b =
- *(const rd_kafka_assignor_topic_t * const *)_b;
-
- return !strcmp(a->metadata->topic, b->metadata->topic);
-}
-
-/**
- * Maps the available topics to the group members' subscriptions
- * and updates the `member` map with the proper list of eligible topics,
- * the latter are returned in `eligible_topics`.
- */
-static void
-rd_kafka_member_subscriptions_map (rd_kafka_cgrp_t *rkcg,
- rd_list_t *eligible_topics,
- const rd_kafka_metadata_t *metadata,
- rd_kafka_group_member_t *members,
- int member_cnt) {
- int ti;
- rd_kafka_assignor_topic_t *eligible_topic = NULL;
-
- rd_list_init(eligible_topics, RD_MIN(metadata->topic_cnt, 10),
- (void *)rd_kafka_assignor_topic_destroy);
-
- /* For each topic in the cluster, scan through the member list
- * to find matching subscriptions. */
- for (ti = 0 ; ti < metadata->topic_cnt ; ti++) {
- int complete_cnt = 0;
- int i;
-
- /* Ignore topics in blacklist */
- if (rkcg->rkcg_rk->rk_conf.topic_blacklist &&
- rd_kafka_pattern_match(rkcg->rkcg_rk->rk_conf.
- topic_blacklist,
- metadata->topics[ti].topic)) {
- rd_kafka_dbg(rkcg->rkcg_rk, TOPIC, "BLACKLIST",
- "Assignor ignoring blacklisted "
- "topic \"%s\"",
- metadata->topics[ti].topic);
- continue;
- }
-
- if (!eligible_topic)
- eligible_topic = rd_calloc(1, sizeof(*eligible_topic));
-
- rd_list_init(&eligible_topic->members, member_cnt, NULL);
-
- /* For each member: scan through its topic subscription */
- for (i = 0 ; i < member_cnt ; i++) {
- /* Match topic against existing metadata,
- incl regex matching. */
- if (rd_kafka_member_subscription_match(
- rkcg, &members[i], &metadata->topics[ti],
- eligible_topic))
- complete_cnt++;
- }
-
- if (rd_list_empty(&eligible_topic->members)) {
- rd_list_destroy(&eligible_topic->members);
- continue;
- }
-
- eligible_topic->metadata = &metadata->topics[ti];
- rd_list_add(eligible_topics, eligible_topic);
- eligible_topic = NULL;
-
- if (complete_cnt == (int)member_cnt)
- break;
- }
-
- if (eligible_topic)
- rd_free(eligible_topic);
-}
-
-
-rd_kafka_resp_err_t
-rd_kafka_assignor_run (rd_kafka_cgrp_t *rkcg,
- const char *protocol_name,
- rd_kafka_metadata_t *metadata,
- rd_kafka_group_member_t *members,
- int member_cnt,
- char *errstr, size_t errstr_size) {
- rd_kafka_resp_err_t err;
- rd_kafka_assignor_t *rkas;
- rd_ts_t ts_start = rd_clock();
- int i;
- rd_list_t eligible_topics;
- int j;
-
- if (!(rkas = rd_kafka_assignor_find(rkcg->rkcg_rk, protocol_name)) ||
- !rkas->rkas_enabled) {
- rd_snprintf(errstr, errstr_size,
- "Unsupported assignor \"%s\"", protocol_name);
- return RD_KAFKA_RESP_ERR__UNKNOWN_PROTOCOL;
- }
-
-
- /* Map available topics to subscribing members */
- rd_kafka_member_subscriptions_map(rkcg, &eligible_topics, metadata,
- members, member_cnt);
-
-
- if (rkcg->rkcg_rk->rk_conf.debug & RD_KAFKA_DBG_CGRP) {
- rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "ASSIGN",
- "Group \"%s\" running %s assignment for "
- "%d member(s):",
- rkcg->rkcg_group_id->str, protocol_name,
- member_cnt);
-
- for (i = 0 ; i < member_cnt ; i++) {
- const rd_kafka_group_member_t *member = &members[i];
-
- rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "ASSIGN",
- " Member \"%.*s\"%s with "
- "%d subscription(s):",
- RD_KAFKAP_STR_PR(member->rkgm_member_id),
- !rd_kafkap_str_cmp(member->rkgm_member_id,
- rkcg->rkcg_member_id) ?
- " (me)":"",
- member->rkgm_subscription->cnt);
- for (j = 0 ; j < member->rkgm_subscription->cnt ; j++) {
- const rd_kafka_topic_partition_t *p =
- &member->rkgm_subscription->elems[j];
- rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "ASSIGN",
- " %s [%"PRId32"]",
- p->topic, p->partition);
- }
- }
-
-
- }
-
- /* Call assignors assign callback */
- err = rkas->rkas_assign_cb(rkcg->rkcg_rk,
- rkcg->rkcg_member_id->str,
- protocol_name, metadata,
- members, member_cnt,
- (rd_kafka_assignor_topic_t **)
- eligible_topics.rl_elems,
- eligible_topics.rl_cnt,
- errstr, sizeof(errstr),
- rkas->rkas_opaque);
-
- if (err) {
- rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "ASSIGN",
- "Group \"%s\" %s assignment failed "
- "for %d member(s): %s",
- rkcg->rkcg_group_id->str, protocol_name,
- (int)member_cnt, errstr);
- } else if (rkcg->rkcg_rk->rk_conf.debug & RD_KAFKA_DBG_CGRP) {
- rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "ASSIGN",
- "Group \"%s\" %s assignment for %d member(s) "
- "finished in %.3fms:",
- rkcg->rkcg_group_id->str, protocol_name,
- (int)member_cnt,
- (float)(rd_clock() - ts_start)/1000.0f);
- for (i = 0 ; i < member_cnt ; i++) {
- const rd_kafka_group_member_t *member = &members[i];
-
- rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "ASSIGN",
- " Member \"%.*s\"%s assigned "
- "%d partition(s):",
- RD_KAFKAP_STR_PR(member->rkgm_member_id),
- !rd_kafkap_str_cmp(member->rkgm_member_id,
- rkcg->rkcg_member_id) ?
- " (me)":"",
- member->rkgm_assignment->cnt);
- for (j = 0 ; j < member->rkgm_assignment->cnt ; j++) {
- const rd_kafka_topic_partition_t *p =
- &member->rkgm_assignment->elems[j];
- rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "ASSIGN",
- " %s [%"PRId32"]",
- p->topic, p->partition);
- }
- }
- }
-
- rd_list_destroy(&eligible_topics);
-
- return err;
-}
-
-
-/**
- * Assignor protocol string comparator
- */
-static int rd_kafka_assignor_cmp_str (const void *_a, const void *_b) {
- const char *a = _a;
- const rd_kafka_assignor_t *b = _b;
-
- return rd_kafkap_str_cmp_str2(a, b->rkas_protocol_name);
-}
-
-/**
- * Find assignor by protocol name.
- *
- * Locality: any
- * Locks: none
- */
-rd_kafka_assignor_t *
-rd_kafka_assignor_find (rd_kafka_t *rk, const char *protocol) {
- return (rd_kafka_assignor_t *)
- rd_list_find(&rk->rk_conf.partition_assignors, protocol,
- rd_kafka_assignor_cmp_str);
-}
-
-
-/**
- * Destroys an assignor (but does not unlink).
- */
-static void rd_kafka_assignor_destroy (rd_kafka_assignor_t *rkas) {
- rd_kafkap_str_destroy(rkas->rkas_protocol_type);
- rd_kafkap_str_destroy(rkas->rkas_protocol_name);
- rd_free(rkas);
-}
-
-
-
-/**
- * Add an assignor, overwriting any previous one with the same protocol_name.
- */
-static rd_kafka_resp_err_t
-rd_kafka_assignor_add (rd_kafka_t *rk,
- rd_kafka_assignor_t **rkasp,
- const char *protocol_type,
- const char *protocol_name,
- rd_kafka_resp_err_t (*assign_cb) (
- rd_kafka_t *rk,
- const char *member_id,
- const char *protocol_name,
- const rd_kafka_metadata_t *metadata,
- rd_kafka_group_member_t *members,
- size_t member_cnt,
- rd_kafka_assignor_topic_t **eligible_topics,
- size_t eligible_topic_cnt,
- char *errstr, size_t errstr_size, void *opaque),
- void *opaque) {
- rd_kafka_assignor_t *rkas;
-
- if (rkasp)
- *rkasp = NULL;
-
- if (rd_kafkap_str_cmp_str(rk->rk_conf.group_protocol_type,
- protocol_type))
- return RD_KAFKA_RESP_ERR__UNKNOWN_PROTOCOL;
-
- /* Dont overwrite application assignors */
- if ((rkas = rd_kafka_assignor_find(rk, protocol_name))) {
- if (rkasp)
- *rkasp = rkas;
- return RD_KAFKA_RESP_ERR__CONFLICT;
- }
-
- rkas = rd_calloc(1, sizeof(*rkas));
-
- rkas->rkas_protocol_name = rd_kafkap_str_new(protocol_name, -1);
- rkas->rkas_protocol_type = rd_kafkap_str_new(protocol_type, -1);
- rkas->rkas_assign_cb = assign_cb;
- rkas->rkas_get_metadata_cb = rd_kafka_assignor_get_metadata;
- rkas->rkas_opaque = opaque;
-
- rd_list_add(&rk->rk_conf.partition_assignors, rkas);
-
- if (rkasp)
- *rkasp = rkas;
-
- return RD_KAFKA_RESP_ERR_NO_ERROR;
-}
-
-
-/* Right trim string of whitespaces */
-static void rtrim (char *s) {
- char *e = s + strlen(s);
-
- if (e == s)
- return;
-
- while (e >= s && isspace(*e))
- e--;
-
- *e = '\0';
-}
-
-
-/**
- * Initialize assignor list based on configuration.
- */
-int rd_kafka_assignors_init (rd_kafka_t *rk, char *errstr, size_t errstr_size) {
- char *wanted;
- char *s;
-
- rd_list_init(&rk->rk_conf.partition_assignors, 2,
- (void *)rd_kafka_assignor_destroy);
-
- rd_strdupa(&wanted, rk->rk_conf.partition_assignment_strategy);
-
- s = wanted;
- while (*s) {
- rd_kafka_assignor_t *rkas = NULL;
- char *t;
-
- /* Left trim */
- while (*s == ' ' || *s == ',')
- s++;
-
- if ((t = strchr(s, ','))) {
- *t = '\0';
- t++;
- } else {
- t = s + strlen(s);
- }
-
- /* Right trim */
- rtrim(s);
-
- /* Match builtin consumer assignors */
- if (!strcmp(s, "range"))
- rd_kafka_assignor_add(
- rk, &rkas, "consumer", "range",
- rd_kafka_range_assignor_assign_cb,
- NULL);
- else if (!strcmp(s, "roundrobin"))
- rd_kafka_assignor_add(
- rk, &rkas, "consumer", "roundrobin",
- rd_kafka_roundrobin_assignor_assign_cb,
- NULL);
- else {
- rd_snprintf(errstr, errstr_size,
- "Unsupported partition.assignment.strategy:"
- " %s", s);
- return -1;
- }
-
- if (rkas) {
- if (!rkas->rkas_enabled) {
- rkas->rkas_enabled = 1;
- rk->rk_conf.enabled_assignor_cnt++;
- }
- }
-
- s = t;
- }
-
- return 0;
-}
-
-
-
-/**
- * Free assignors
- */
-void rd_kafka_assignors_term (rd_kafka_t *rk) {
- rd_list_destroy(&rk->rk_conf.partition_assignors);
-}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.1/src/rdkafka_assignor.h
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdkafka_assignor.h b/thirdparty/librdkafka-0.11.1/src/rdkafka_assignor.h
deleted file mode 100644
index 75a2dd8..0000000
--- a/thirdparty/librdkafka-0.11.1/src/rdkafka_assignor.h
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
- * librdkafka - The Apache Kafka C/C++ library
- *
- * Copyright (c) 2015 Magnus Edenhill
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- *
- * 1. Redistributions of source code must retain the above copyright notice,
- * this list of conditions and the following disclaimer.
- * 2. Redistributions in binary form must reproduce the above copyright notice,
- * this list of conditions and the following disclaimer in the documentation
- * and/or other materials provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- */
-#pragma once
-
-
-
-
-
-typedef struct rd_kafka_group_member_s {
- rd_kafka_topic_partition_list_t *rkgm_subscription;
- rd_kafka_topic_partition_list_t *rkgm_assignment;
- rd_list_t rkgm_eligible;
- rd_kafkap_str_t *rkgm_member_id;
- rd_kafkap_bytes_t *rkgm_userdata;
- rd_kafkap_bytes_t *rkgm_member_metadata;
-} rd_kafka_group_member_t;
-
-
-int rd_kafka_group_member_cmp (const void *_a, const void *_b);
-
-int
-rd_kafka_group_member_find_subscription (rd_kafka_t *rk,
- const rd_kafka_group_member_t *rkgm,
- const char *topic);
-
-
-/**
- * Structure to hold metadata for a single topic and all its
- * subscribing members.
- */
-typedef struct rd_kafka_assignor_topic_s {
- const rd_kafka_metadata_topic_t *metadata;
- rd_list_t members; /* rd_kafka_group_member_t * */
-} rd_kafka_assignor_topic_t;
-
-
-int rd_kafka_assignor_topic_cmp (const void *_a, const void *_b);
-
-
-typedef struct rd_kafka_assignor_s {
- rd_kafkap_str_t *rkas_protocol_type;
- rd_kafkap_str_t *rkas_protocol_name;
-
- const void *rkas_userdata;
- size_t rkas_userdata_size;
-
- int rkas_enabled;
-
- rd_kafka_resp_err_t (*rkas_assign_cb) (
- rd_kafka_t *rk,
- const char *member_id,
- const char *protocol_name,
- const rd_kafka_metadata_t *metadata,
- rd_kafka_group_member_t *members,
- size_t member_cnt,
- rd_kafka_assignor_topic_t **eligible_topics,
- size_t eligible_topic_cnt,
- char *errstr,
- size_t errstr_size,
- void *opaque);
-
- rd_kafkap_bytes_t *(*rkas_get_metadata_cb) (
- struct rd_kafka_assignor_s *rkpas,
- const rd_list_t *topics);
-
-
- void (*rkas_on_assignment_cb) (const char *member_id,
- rd_kafka_group_member_t
- *assignment, void *opaque);
-
- void *rkas_opaque;
-} rd_kafka_assignor_t;
-
-
-rd_kafkap_bytes_t *
-rd_kafka_assignor_get_metadata (rd_kafka_assignor_t *rkpas,
- const rd_list_t *topics);
-
-
-void rd_kafka_assignor_update_subscription (rd_kafka_assignor_t *rkpas,
- const rd_kafka_topic_partition_list_t
- *subscription);
-
-
-rd_kafka_resp_err_t
-rd_kafka_assignor_run (struct rd_kafka_cgrp_s *rkcg,
- const char *protocol_name,
- rd_kafka_metadata_t *metadata,
- rd_kafka_group_member_t *members, int member_cnt,
- char *errstr, size_t errstr_size);
-
-rd_kafka_assignor_t *
-rd_kafka_assignor_find (rd_kafka_t *rk, const char *protocol);
-
-int rd_kafka_assignors_init (rd_kafka_t *rk, char *errstr, size_t errstr_size);
-void rd_kafka_assignors_term (rd_kafka_t *rk);
-
-
-
-void rd_kafka_group_member_clear (rd_kafka_group_member_t *rkgm);
-
-
-/**
- * rd_kafka_range_assignor.c
- */
-rd_kafka_resp_err_t
-rd_kafka_range_assignor_assign_cb (rd_kafka_t *rk,
- const char *member_id,
- const char *protocol_name,
- const rd_kafka_metadata_t *metadata,
- rd_kafka_group_member_t *members,
- size_t member_cnt,
- rd_kafka_assignor_topic_t **eligible_topics,
- size_t eligible_topic_cnt,
- char *errstr, size_t errstr_size,
- void *opaque);
-
-
-/**
- * rd_kafka_roundrobin_assignor.c
- */
-rd_kafka_resp_err_t
-rd_kafka_roundrobin_assignor_assign_cb (rd_kafka_t *rk,
- const char *member_id,
- const char *protocol_name,
- const rd_kafka_metadata_t *metadata,
- rd_kafka_group_member_t *members,
- size_t member_cnt,
- rd_kafka_assignor_topic_t
- **eligible_topics,
- size_t eligible_topic_cnt,
- char *errstr, size_t errstr_size,
- void *opaque);
-