You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by uce <gi...@git.apache.org> on 2016/06/03 14:55:20 UTC

[GitHub] flink pull request #2069: [FLINK-3872] [table, connector-kafka] Add KafkaJso...

GitHub user uce opened a pull request:

    https://github.com/apache/flink/pull/2069

    [FLINK-3872] [table, connector-kafka] Add KafkaJsonTableSource

    Adds `StreamTableSource` variants for Kafka with syntactic sugar for parsing JSON streams.
    
    ```java
    KafkaJsonTableSource source = new Kafka08JsonTableSource(
        topic,
        props,
        new String[] { "id" }, // field names
        new Class<?>[] { Long.class }); // field types
    
    tableEnvironment.registerTableSource("kafka-stream", source)
    ```
    
    You can then continue to work with the stream:
    
    ```java
    Table result = tableEnvironment.ingest("kafka-stream").filter("id > 1000");
    tableEnvironment.toDataStream(result, Row.class).print();
    ```
    
    **Limitations**
    - Assumes flat JSON field access (we can easily extend this to use JSON pointers, allowing us to parse nested fields like `/location/area` as field names).
    - This does not extract any timestamp or watermarks (not an issue right now as the Table API currently does not support operations where this is needed).
    - API is kind of cumbersome and non Scalaesque for the Scala Table API.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/uce/flink 3872-kafkajson_table

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/2069.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2069
    
----
commit 12ec6a594d23bd36bed1e07eeaba2aa75a768f67
Author: Ufuk Celebi <uc...@apache.org>
Date:   2016-06-02T20:38:23Z

    [FLINK-3872] [table, connector-kafka] Add JsonRowDeserializationSchema
    
    - Adds a deserialization schema from byte[] to Row to be used in conjunction
      with the Table API.

commit a8dc3aa7ab70a91b12af2adccbbed821bf25ecc9
Author: Ufuk Celebi <uc...@apache.org>
Date:   2016-06-03T13:24:22Z

    [FLINK-3872] [table, connector-kafka] Add KafkaTableSource

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2069: [FLINK-3872] [table, connector-kafka] Add KafkaJso...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2069#discussion_r66605120
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java ---
    @@ -0,0 +1,85 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.kafka;
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.table.sources.StreamTableSource;
    +import org.apache.flink.streaming.util.serialization.DeserializationSchema;
    +import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
    +
    +import java.util.Properties;
    +
    +/**
    + * A version-agnostic Kafka JSON {@link StreamTableSource}.
    + *
    + * <p>The version-specific Kafka consumers need to extend this class and
    + * override {@link #getKafkaConsumer(String, Properties, DeserializationSchema)}}.
    + *
    + * <p>The field names are used to parse the JSON file and so are the types.
    + */
    +public abstract class KafkaJsonTableSource extends KafkaTableSource {
    --- End diff --
    
    Does it make sense to add methods to configure the Jackson `ObjectMapper`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2069: [FLINK-3872] [table, connector-kafka] Add KafkaJso...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2069#discussion_r66591683
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java ---
    @@ -0,0 +1,116 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.util.serialization;
    +
    +import com.fasterxml.jackson.databind.JsonNode;
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.java.typeutils.TypeExtractor;
    +import org.apache.flink.api.table.Row;
    +import org.apache.flink.api.table.typeutils.RowTypeInfo;
    +import org.apache.flink.util.Preconditions;
    +
    +import java.io.IOException;
    +
    +/**
    + * Deserialization schema from JSON to {@link Row}.
    + *
    + * <p>Deserializes the <code>byte[]</code> messages as a JSON object and reads
    + * the specified fields.
    + *
    + * <p>Failure during deserialization are forwarded as wrapped IOExceptions.
    + */
    +public class JsonRowDeserializationSchema implements DeserializationSchema<Row> {
    +
    +	/** Field names to parse. Indices match fieldTypes indices. */
    +	private final String[] fieldNames;
    +
    +	/** Types to parse fields as. Indices match fieldNames indices. */
    +	private final TypeInformation<?>[] fieldTypes;
    +
    +	/** Object mapper for parsing the JSON. */
    +	private final ObjectMapper objectMapper = new ObjectMapper();
    +
    +	/**
    +	 * Creates a JSON deserializtion schema for the given fields and type classes.
    +	 *
    +	 * @param fieldNames Names of JSON fields to parse.
    +	 * @param fieldTypes Type classes to parse JSON fields as.
    +	 */
    +	public JsonRowDeserializationSchema(String[] fieldNames, Class<?>[] fieldTypes) {
    +		this.fieldNames = Preconditions.checkNotNull(fieldNames, "Field names");
    +
    +		this.fieldTypes = new TypeInformation[fieldTypes.length];
    +		for (int i = 0; i < fieldTypes.length; i++) {
    +			this.fieldTypes[i] = TypeExtractor.getForClass(fieldTypes[i]);
    +		}
    +
    +		Preconditions.checkArgument(fieldNames.length == fieldTypes.length,
    +				"Number of provided field names and types does not match.");
    +	}
    +
    +	/**
    +	 * Creates a JSON deserializtion schema for the given fields and types.
    +	 *
    +	 * @param fieldNames Names of JSON fields to parse.
    +	 * @param fieldTypes Types to parse JSON fields as.
    +	 */
    +	public JsonRowDeserializationSchema(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
    +		this.fieldNames = Preconditions.checkNotNull(fieldNames, "Field names");
    +		this.fieldTypes = Preconditions.checkNotNull(fieldTypes, "Field types");
    +
    +		Preconditions.checkArgument(fieldNames.length == fieldTypes.length,
    +				"Number of provided field names and types does not match.");
    +	}
    +
    +	@Override
    +	public Row deserialize(byte[] message) throws IOException {
    +		try {
    +			JsonNode root = objectMapper.readTree(message);
    +
    +			Row row = new Row(fieldNames.length);
    +			for (int i = 0; i < fieldNames.length; i++) {
    +				JsonNode node = root.get(fieldNames[i]);
    +
    +				if (node == null) {
    --- End diff --
    
    Should we make the behavior for `node == null` configurable? An alternative could be to tolerate missing fields and set them to `null`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2069: [FLINK-3872] [table, connector-kafka] Add KafkaJso...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2069#discussion_r66608120
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java ---
    @@ -0,0 +1,85 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.kafka;
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.table.sources.StreamTableSource;
    +import org.apache.flink.streaming.util.serialization.DeserializationSchema;
    +import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
    +
    +import java.util.Properties;
    +
    +/**
    + * A version-agnostic Kafka JSON {@link StreamTableSource}.
    + *
    + * <p>The version-specific Kafka consumers need to extend this class and
    + * override {@link #getKafkaConsumer(String, Properties, DeserializationSchema)}}.
    + *
    + * <p>The field names are used to parse the JSON file and so are the types.
    + */
    +public abstract class KafkaJsonTableSource extends KafkaTableSource {
    --- End diff --
    
    I had a look at `com.fasterxml.jackson.databind.DeserializationFeature` and thought that it might make sense to expose these options.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2069: [FLINK-3872] [table, connector-kafka] Add KafkaJsonTableS...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on the issue:

    https://github.com/apache/flink/pull/2069
  
    Thanks for the suggestion Fabian. I've added the table to the docs and added an example about the Kafka JSON source. Furthermore, I've added the configuration flag for the missing field failure behaviour. I'll merge this after my local Travis build passes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2069: [FLINK-3872] [table, connector-kafka] Add KafkaJsonTableS...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/2069
  
    Hi @uce, thanks for the PR. Looks really good. I added three comments inline. 
    
    Regarding the watermarks and timestamp handling, I am thinking about introducing an `EventTimeTableSource` interface that provides all required methods. A `StreamTableSource` can then implement this interface to support event-time. I will open a JIRA for this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2069: [FLINK-3872] [table, connector-kafka] Add KafkaJso...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2069#discussion_r66640134
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java ---
    @@ -0,0 +1,85 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.kafka;
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.table.sources.StreamTableSource;
    +import org.apache.flink.streaming.util.serialization.DeserializationSchema;
    +import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
    +
    +import java.util.Properties;
    +
    +/**
    + * A version-agnostic Kafka JSON {@link StreamTableSource}.
    + *
    + * <p>The version-specific Kafka consumers need to extend this class and
    + * override {@link #getKafkaConsumer(String, Properties, DeserializationSchema)}}.
    + *
    + * <p>The field names are used to parse the JSON file and so are the types.
    + */
    +public abstract class KafkaJsonTableSource extends KafkaTableSource {
    --- End diff --
    
    I think we can add this later as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2069: [FLINK-3872] [table, connector-kafka] Add KafkaJso...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2069#discussion_r66606860
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java ---
    @@ -0,0 +1,85 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.kafka;
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.table.sources.StreamTableSource;
    +import org.apache.flink.streaming.util.serialization.DeserializationSchema;
    +import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
    +
    +import java.util.Properties;
    +
    +/**
    + * A version-agnostic Kafka JSON {@link StreamTableSource}.
    + *
    + * <p>The version-specific Kafka consumers need to extend this class and
    + * override {@link #getKafkaConsumer(String, Properties, DeserializationSchema)}}.
    + *
    + * <p>The field names are used to parse the JSON file and so are the types.
    + */
    +public abstract class KafkaJsonTableSource extends KafkaTableSource {
    --- End diff --
    
    Maybe. \U0001f604 What do you have in mind? Users can also use the generic KafkaTableSource with their own row deserialization schema. Maybe that's more flexible. What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2069: [FLINK-3872] [table, connector-kafka] Add KafkaJso...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2069#discussion_r66606127
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java ---
    @@ -0,0 +1,116 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.util.serialization;
    +
    +import com.fasterxml.jackson.databind.JsonNode;
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.java.typeutils.TypeExtractor;
    +import org.apache.flink.api.table.Row;
    +import org.apache.flink.api.table.typeutils.RowTypeInfo;
    +import org.apache.flink.util.Preconditions;
    +
    +import java.io.IOException;
    +
    +/**
    + * Deserialization schema from JSON to {@link Row}.
    + *
    + * <p>Deserializes the <code>byte[]</code> messages as a JSON object and reads
    + * the specified fields.
    + *
    + * <p>Failure during deserialization are forwarded as wrapped IOExceptions.
    + */
    +public class JsonRowDeserializationSchema implements DeserializationSchema<Row> {
    +
    +	/** Field names to parse. Indices match fieldTypes indices. */
    +	private final String[] fieldNames;
    +
    +	/** Types to parse fields as. Indices match fieldNames indices. */
    +	private final TypeInformation<?>[] fieldTypes;
    +
    +	/** Object mapper for parsing the JSON. */
    +	private final ObjectMapper objectMapper = new ObjectMapper();
    +
    +	/**
    +	 * Creates a JSON deserializtion schema for the given fields and type classes.
    +	 *
    +	 * @param fieldNames Names of JSON fields to parse.
    +	 * @param fieldTypes Type classes to parse JSON fields as.
    +	 */
    +	public JsonRowDeserializationSchema(String[] fieldNames, Class<?>[] fieldTypes) {
    +		this.fieldNames = Preconditions.checkNotNull(fieldNames, "Field names");
    +
    +		this.fieldTypes = new TypeInformation[fieldTypes.length];
    +		for (int i = 0; i < fieldTypes.length; i++) {
    +			this.fieldTypes[i] = TypeExtractor.getForClass(fieldTypes[i]);
    +		}
    +
    +		Preconditions.checkArgument(fieldNames.length == fieldTypes.length,
    +				"Number of provided field names and types does not match.");
    +	}
    +
    +	/**
    +	 * Creates a JSON deserializtion schema for the given fields and types.
    +	 *
    +	 * @param fieldNames Names of JSON fields to parse.
    +	 * @param fieldTypes Types to parse JSON fields as.
    +	 */
    +	public JsonRowDeserializationSchema(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
    +		this.fieldNames = Preconditions.checkNotNull(fieldNames, "Field names");
    +		this.fieldTypes = Preconditions.checkNotNull(fieldTypes, "Field types");
    +
    +		Preconditions.checkArgument(fieldNames.length == fieldTypes.length,
    +				"Number of provided field names and types does not match.");
    +	}
    +
    +	@Override
    +	public Row deserialize(byte[] message) throws IOException {
    +		try {
    +			JsonNode root = objectMapper.readTree(message);
    +
    +			Row row = new Row(fieldNames.length);
    +			for (int i = 0; i < fieldNames.length; i++) {
    +				JsonNode node = root.get(fieldNames[i]);
    +
    +				if (node == null) {
    --- End diff --
    
    Good idea. Would make sense to default to that behaviour actually. Otherwise a malformatted JSON will kill the job. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2069: [FLINK-3872] [table, connector-kafka] Add KafkaJso...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2069#discussion_r66591344
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java ---
    @@ -0,0 +1,156 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.kafka;
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.java.typeutils.TypeExtractor;
    +import org.apache.flink.api.table.Row;
    +import org.apache.flink.api.table.sources.StreamTableSource;
    +import org.apache.flink.api.table.typeutils.RowTypeInfo;
    +import org.apache.flink.streaming.api.datastream.DataStream;
    +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    +import org.apache.flink.streaming.util.serialization.DeserializationSchema;
    +import org.apache.flink.util.Preconditions;
    +
    +import java.util.Properties;
    +
    +/**
    + * A version-agnostic Kafka {@link StreamTableSource}.
    + *
    + * <p>The version-specific Kafka consumers need to extend this class and
    + * override {@link #getKafkaConsumer(String, Properties, DeserializationSchema)}}.
    + */
    +abstract class KafkaTableSource implements StreamTableSource<Row> {
    +
    +	/** The Kafka topic to consume. */
    +	private final String topic;
    +
    +	/** Properties for the Kafka consumer. */
    +	private final Properties properties;
    +
    +	/** Deserialization schema to use for Kafka records. */
    +	private final DeserializationSchema<Row> deserializationSchema;
    +
    +	/** Row field names. */
    +	private final String[] fieldNames;
    +
    +	/** Row field types. */
    +	private final TypeInformation<?>[] fieldTypes;
    +
    +	/**
    +	 * Creates a generic Kafka {@link StreamTableSource}.
    +	 *
    +	 * @param topic                 Kafka topic to consume.
    +	 * @param properties            Properties for the Kafka consumer.
    +	 * @param deserializationSchema Deserialization schema to use for Kafka records.
    +	 * @param fieldNames            Row field names.
    +	 * @param fieldTypes            Row field types.
    +	 */
    +	KafkaTableSource(
    +			String topic,
    +			Properties properties,
    +			DeserializationSchema<Row> deserializationSchema,
    +			String[] fieldNames,
    +			Class<?>[] fieldTypes) {
    +
    +		this(topic, properties, deserializationSchema, fieldNames, toTypeInfo(fieldTypes));
    +	}
    +
    +	/**
    +	 * Creates a generic Kafka {@link StreamTableSource}.
    +	 *
    +	 * @param topic                 Kafka topic to consume.
    +	 * @param properties            Properties for the Kafka consumer.
    +	 * @param deserializationSchema Deserialization schema to use for Kafka records.
    +	 * @param fieldNames            Row field names.
    +	 * @param fieldTypes            Row field types.
    +	 */
    +	KafkaTableSource(
    +			String topic,
    +			Properties properties,
    +			DeserializationSchema<Row> deserializationSchema,
    +			String[] fieldNames,
    +			TypeInformation<?>[] fieldTypes) {
    +
    +		this.topic = Preconditions.checkNotNull(topic, "Topic");
    +		this.properties = Preconditions.checkNotNull(properties, "Properties");
    +		this.deserializationSchema = Preconditions.checkNotNull(deserializationSchema, "Deserialization schema");
    +		this.fieldNames = Preconditions.checkNotNull(fieldNames, "Field names");
    +		this.fieldTypes = Preconditions.checkNotNull(fieldTypes, "Field types");
    +
    +		Preconditions.checkArgument(fieldNames.length == fieldNames.length,
    --- End diff --
    
    should be `fieldNames.length == fieldTypes.length`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2069: [FLINK-3872] [table, connector-kafka] Add KafkaJso...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2069#discussion_r66606216
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java ---
    @@ -735,6 +738,123 @@ public void flatMap(Tuple3<Integer, Integer, String> value, Collector<Integer> o
     		}
     	}
     
    +	/**
    +	 * Runs a table source test with JSON data.
    +	 *
    +	 * The table source needs to parse the following JSON fields:
    +	 * - "long" -> number
    +	 * - "string" -> "string"
    +	 * - "boolean" -> true|false
    +	 * - "double" -> fraction
    +	 */
    +	public void runJsonTableSource(String topic, KafkaTableSource kafkaTableSource) throws Exception {
    +		final ObjectMapper mapper = new ObjectMapper();
    +
    +		final int numElements = 1024;
    +		final long[] longs = new long[numElements];
    +		final String[] strings = new String[numElements];
    +		final boolean[] booleans = new boolean[numElements];
    +		final double[] doubles = new double[numElements];
    +
    +		final byte[][] serializedJson = new byte[numElements][];
    +
    +		ThreadLocalRandom random = ThreadLocalRandom.current();
    +
    +		for (int i = 0; i < numElements; i++) {
    +			longs[i] = random.nextLong();
    +			strings[i] = Integer.toHexString(random.nextInt());
    +			booleans[i] = random.nextBoolean();
    +			doubles[i] = random.nextDouble();
    +
    +			ObjectNode entry = mapper.createObjectNode();
    +			entry.put("long", longs[i]);
    +			entry.put("string", strings[i]);
    +			entry.put("boolean", booleans[i]);
    +			entry.put("double", doubles[i]);
    +
    +			serializedJson[i] = mapper.writeValueAsBytes(entry);
    +		}
    +
    +		// Produce serialized JSON data
    +		createTestTopic(topic, 1, 1);
    +
    +		StreamExecutionEnvironment env = StreamExecutionEnvironment
    +				.createRemoteEnvironment("localhost", flinkPort);
    +		env.getConfig().disableSysoutLogging();
    +
    +		env.addSource(new SourceFunction<byte[]>() {
    +			@Override
    +			public void run(SourceContext<byte[]> ctx) throws Exception {
    +				for (int i = 0; i < numElements; i++) {
    +					ctx.collect(serializedJson[i]);
    +				}
    +			}
    +
    +			@Override
    +			public void cancel() {
    +			}
    +		}).addSink(kafkaServer.getProducer(
    +				topic,
    +				new ByteArraySerializationSchema(),
    +				standardProps,
    +				null));
    +
    +		// Execute blocks
    +		env.execute();
    +
    +		// Register as table source
    +		StreamTableEnvironment tableEnvironment = StreamTableEnvironment.getTableEnvironment(env);
    +		tableEnvironment.registerTableSource("kafka", kafkaTableSource);
    +
    +		Table result = tableEnvironment.ingest("kafka");
    +
    +		tableEnvironment.toDataStream(result, Row.class).addSink(new SinkFunction<Row>() {
    +
    +			int i = 0;
    +
    +			@Override
    +			public void invoke(Row value) throws Exception {
    +				if (i > numElements) {
    +					throw new IllegalStateException("Received too many rows.");
    +				}
    +
    +				assertEquals(longs[i], value.productElement(0));
    +				assertEquals(strings[i], value.productElement(1));
    +				assertEquals(booleans[i], value.productElement(2));
    +				assertEquals(doubles[i], value.productElement(3));
    +
    +				if (i == numElements-1) {
    +					throw new SuccessException();
    --- End diff --
    
    Yes, the first check can be removed. I think it's fine to just have the exact check in place.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2069: [FLINK-3872] [table, connector-kafka] Add KafkaJsonTableS...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on the issue:

    https://github.com/apache/flink/pull/2069
  
    Thanks for the review Fabian. I've addressed all comments except the ObjectMapper configuration. The EvenTimeTableSource makes sense. We can make the Kafka source extend from that one later on. Do you think we can merge this now?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2069: [FLINK-3872] [table, connector-kafka] Add KafkaJsonTableS...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on the issue:

    https://github.com/apache/flink/pull/2069
  
    Test failure is unrelated, going to merge this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2069: [FLINK-3872] [table, connector-kafka] Add KafkaJso...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/2069


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2069: [FLINK-3872] [table, connector-kafka] Add KafkaJsonTableS...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the issue:

    https://github.com/apache/flink/pull/2069
  
    Quickly checked the maven and kafka stuff. Looks good. Very clean, well documented and tested code.
    Further review for the table api specific parts are still needed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2069: [FLINK-3872] [table, connector-kafka] Add KafkaJso...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2069#discussion_r66606072
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java ---
    @@ -0,0 +1,156 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.kafka;
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.java.typeutils.TypeExtractor;
    +import org.apache.flink.api.table.Row;
    +import org.apache.flink.api.table.sources.StreamTableSource;
    +import org.apache.flink.api.table.typeutils.RowTypeInfo;
    +import org.apache.flink.streaming.api.datastream.DataStream;
    +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    +import org.apache.flink.streaming.util.serialization.DeserializationSchema;
    +import org.apache.flink.util.Preconditions;
    +
    +import java.util.Properties;
    +
    +/**
    + * A version-agnostic Kafka {@link StreamTableSource}.
    + *
    + * <p>The version-specific Kafka consumers need to extend this class and
    + * override {@link #getKafkaConsumer(String, Properties, DeserializationSchema)}}.
    + */
    +abstract class KafkaTableSource implements StreamTableSource<Row> {
    +
    +	/** The Kafka topic to consume. */
    +	private final String topic;
    +
    +	/** Properties for the Kafka consumer. */
    +	private final Properties properties;
    +
    +	/** Deserialization schema to use for Kafka records. */
    +	private final DeserializationSchema<Row> deserializationSchema;
    +
    +	/** Row field names. */
    +	private final String[] fieldNames;
    +
    +	/** Row field types. */
    +	private final TypeInformation<?>[] fieldTypes;
    +
    +	/**
    +	 * Creates a generic Kafka {@link StreamTableSource}.
    +	 *
    +	 * @param topic                 Kafka topic to consume.
    +	 * @param properties            Properties for the Kafka consumer.
    +	 * @param deserializationSchema Deserialization schema to use for Kafka records.
    +	 * @param fieldNames            Row field names.
    +	 * @param fieldTypes            Row field types.
    +	 */
    +	KafkaTableSource(
    +			String topic,
    +			Properties properties,
    +			DeserializationSchema<Row> deserializationSchema,
    +			String[] fieldNames,
    +			Class<?>[] fieldTypes) {
    +
    +		this(topic, properties, deserializationSchema, fieldNames, toTypeInfo(fieldTypes));
    +	}
    +
    +	/**
    +	 * Creates a generic Kafka {@link StreamTableSource}.
    +	 *
    +	 * @param topic                 Kafka topic to consume.
    +	 * @param properties            Properties for the Kafka consumer.
    +	 * @param deserializationSchema Deserialization schema to use for Kafka records.
    +	 * @param fieldNames            Row field names.
    +	 * @param fieldTypes            Row field types.
    +	 */
    +	KafkaTableSource(
    +			String topic,
    +			Properties properties,
    +			DeserializationSchema<Row> deserializationSchema,
    +			String[] fieldNames,
    +			TypeInformation<?>[] fieldTypes) {
    +
    +		this.topic = Preconditions.checkNotNull(topic, "Topic");
    +		this.properties = Preconditions.checkNotNull(properties, "Properties");
    +		this.deserializationSchema = Preconditions.checkNotNull(deserializationSchema, "Deserialization schema");
    +		this.fieldNames = Preconditions.checkNotNull(fieldNames, "Field names");
    +		this.fieldTypes = Preconditions.checkNotNull(fieldTypes, "Field types");
    +
    +		Preconditions.checkArgument(fieldNames.length == fieldNames.length,
    --- End diff --
    
    Yes! Good catch 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2069: [FLINK-3872] [table, connector-kafka] Add KafkaJsonTableS...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/2069
  
    Hi @uce , the update looks good. I think we should add the new TableSources to the Table API documentation. Maybe adding a table to like to `table.md`?
    
    | Class name | Maven dep | BatchSource | Streaming | Description
    | `CsvTableSouce` | `flink-table` | Y | Y | ...
    | `Kafka08JsonTableSource` | `flink-connector-kafka-0.8` | N | Y | ...



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2069: [FLINK-3872] [table, connector-kafka] Add KafkaJso...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2069#discussion_r66603012
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java ---
    @@ -735,6 +738,123 @@ public void flatMap(Tuple3<Integer, Integer, String> value, Collector<Integer> o
     		}
     	}
     
    +	/**
    +	 * Runs a table source test with JSON data.
    +	 *
    +	 * The table source needs to parse the following JSON fields:
    +	 * - "long" -> number
    +	 * - "string" -> "string"
    +	 * - "boolean" -> true|false
    +	 * - "double" -> fraction
    +	 */
    +	public void runJsonTableSource(String topic, KafkaTableSource kafkaTableSource) throws Exception {
    +		final ObjectMapper mapper = new ObjectMapper();
    +
    +		final int numElements = 1024;
    +		final long[] longs = new long[numElements];
    +		final String[] strings = new String[numElements];
    +		final boolean[] booleans = new boolean[numElements];
    +		final double[] doubles = new double[numElements];
    +
    +		final byte[][] serializedJson = new byte[numElements][];
    +
    +		ThreadLocalRandom random = ThreadLocalRandom.current();
    +
    +		for (int i = 0; i < numElements; i++) {
    +			longs[i] = random.nextLong();
    +			strings[i] = Integer.toHexString(random.nextInt());
    +			booleans[i] = random.nextBoolean();
    +			doubles[i] = random.nextDouble();
    +
    +			ObjectNode entry = mapper.createObjectNode();
    +			entry.put("long", longs[i]);
    +			entry.put("string", strings[i]);
    +			entry.put("boolean", booleans[i]);
    +			entry.put("double", doubles[i]);
    +
    +			serializedJson[i] = mapper.writeValueAsBytes(entry);
    +		}
    +
    +		// Produce serialized JSON data
    +		createTestTopic(topic, 1, 1);
    +
    +		StreamExecutionEnvironment env = StreamExecutionEnvironment
    +				.createRemoteEnvironment("localhost", flinkPort);
    +		env.getConfig().disableSysoutLogging();
    +
    +		env.addSource(new SourceFunction<byte[]>() {
    +			@Override
    +			public void run(SourceContext<byte[]> ctx) throws Exception {
    +				for (int i = 0; i < numElements; i++) {
    +					ctx.collect(serializedJson[i]);
    +				}
    +			}
    +
    +			@Override
    +			public void cancel() {
    +			}
    +		}).addSink(kafkaServer.getProducer(
    +				topic,
    +				new ByteArraySerializationSchema(),
    +				standardProps,
    +				null));
    +
    +		// Execute blocks
    +		env.execute();
    +
    +		// Register as table source
    +		StreamTableEnvironment tableEnvironment = StreamTableEnvironment.getTableEnvironment(env);
    +		tableEnvironment.registerTableSource("kafka", kafkaTableSource);
    +
    +		Table result = tableEnvironment.ingest("kafka");
    +
    +		tableEnvironment.toDataStream(result, Row.class).addSink(new SinkFunction<Row>() {
    +
    +			int i = 0;
    +
    +			@Override
    +			public void invoke(Row value) throws Exception {
    +				if (i > numElements) {
    +					throw new IllegalStateException("Received too many rows.");
    +				}
    +
    +				assertEquals(longs[i], value.productElement(0));
    +				assertEquals(strings[i], value.productElement(1));
    +				assertEquals(booleans[i], value.productElement(2));
    +				assertEquals(doubles[i], value.productElement(3));
    +
    +				if (i == numElements-1) {
    +					throw new SuccessException();
    --- End diff --
    
    Doesn't this prevent to check whether the source emits too many records, i.e., the check in line 817 would never evaluate `true` right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2069: [FLINK-3872] [table, connector-kafka] Add KafkaJso...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2069#discussion_r66591383
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java ---
    @@ -0,0 +1,156 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.kafka;
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.java.typeutils.TypeExtractor;
    +import org.apache.flink.api.table.Row;
    +import org.apache.flink.api.table.sources.StreamTableSource;
    +import org.apache.flink.api.table.typeutils.RowTypeInfo;
    +import org.apache.flink.streaming.api.datastream.DataStream;
    +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    +import org.apache.flink.streaming.util.serialization.DeserializationSchema;
    +import org.apache.flink.util.Preconditions;
    +
    +import java.util.Properties;
    +
    +/**
    + * A version-agnostic Kafka {@link StreamTableSource}.
    + *
    + * <p>The version-specific Kafka consumers need to extend this class and
    + * override {@link #getKafkaConsumer(String, Properties, DeserializationSchema)}}.
    + */
    +abstract class KafkaTableSource implements StreamTableSource<Row> {
    +
    +	/** The Kafka topic to consume. */
    +	private final String topic;
    +
    +	/** Properties for the Kafka consumer. */
    +	private final Properties properties;
    +
    +	/** Deserialization schema to use for Kafka records. */
    +	private final DeserializationSchema<Row> deserializationSchema;
    +
    +	/** Row field names. */
    +	private final String[] fieldNames;
    +
    +	/** Row field types. */
    +	private final TypeInformation<?>[] fieldTypes;
    +
    +	/**
    +	 * Creates a generic Kafka {@link StreamTableSource}.
    +	 *
    +	 * @param topic                 Kafka topic to consume.
    +	 * @param properties            Properties for the Kafka consumer.
    +	 * @param deserializationSchema Deserialization schema to use for Kafka records.
    +	 * @param fieldNames            Row field names.
    +	 * @param fieldTypes            Row field types.
    +	 */
    +	KafkaTableSource(
    +			String topic,
    +			Properties properties,
    +			DeserializationSchema<Row> deserializationSchema,
    +			String[] fieldNames,
    +			Class<?>[] fieldTypes) {
    +
    +		this(topic, properties, deserializationSchema, fieldNames, toTypeInfo(fieldTypes));
    +	}
    +
    +	/**
    +	 * Creates a generic Kafka {@link StreamTableSource}.
    +	 *
    +	 * @param topic                 Kafka topic to consume.
    +	 * @param properties            Properties for the Kafka consumer.
    +	 * @param deserializationSchema Deserialization schema to use for Kafka records.
    +	 * @param fieldNames            Row field names.
    +	 * @param fieldTypes            Row field types.
    +	 */
    +	KafkaTableSource(
    +			String topic,
    +			Properties properties,
    +			DeserializationSchema<Row> deserializationSchema,
    +			String[] fieldNames,
    +			TypeInformation<?>[] fieldTypes) {
    +
    +		this.topic = Preconditions.checkNotNull(topic, "Topic");
    +		this.properties = Preconditions.checkNotNull(properties, "Properties");
    +		this.deserializationSchema = Preconditions.checkNotNull(deserializationSchema, "Deserialization schema");
    +		this.fieldNames = Preconditions.checkNotNull(fieldNames, "Field names");
    +		this.fieldTypes = Preconditions.checkNotNull(fieldTypes, "Field types");
    +
    +		Preconditions.checkArgument(fieldNames.length == fieldNames.length,
    +				"Number of provided field names and types does not match.");
    +	}
    --- End diff --
    
    Can you add a check that field names are unique?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---