You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by mushketyk <gi...@git.apache.org> on 2016/07/13 21:45:29 UTC

[GitHub] flink pull request #2244: Kafka json

GitHub user mushketyk opened a pull request:

    https://github.com/apache/flink/pull/2244

    Kafka json

    Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
    If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html).
    In addition to going through the list, please provide a meaningful description of your changes.
    
    - [x] General
      - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text")
      - The pull request addresses only one issue
      - Each commit in the PR has a meaningful commit message (including the JIRA id)
    
    - [x] Documentation
      - Documentation has been added for new functionality
      - Old documentation affected by the pull request has been updated
      - JavaDoc for public methods has been added
    
    - [x] Tests & Build
      - Functionality added by the pull request is covered by tests
      - `mvn clean verify` has been executed successfully locally or a Travis build has passed


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/mushketyk/flink kafka-json

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/2244.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2244
    
----
commit 9ff33698e44afc005360d8acb10fdbf2ccba814b
Author: Ivan Mushketyk <iv...@gmail.com>
Date:   2016-07-05T21:00:18Z

    [FLINK-3874] Implement KafkaJsonTableSink

commit 3eeb1dcd0f4febe37f92725bc94f3d3b13e3368f
Author: Ivan Mushketyk <iv...@gmail.com>
Date:   2016-07-13T21:43:13Z

    [FLINK-3874] Implement tests for CsvTableSink

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2244: [FLINK-3874] Add a Kafka TableSink with JSON serializatio...

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on the issue:

    https://github.com/apache/flink/pull/2244
  
    This PR has not been reviewed during the last 14 days.
    Can anyone please review it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2244: [FLINK-3874] Add a Kafka TableSink with JSON seria...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2244#discussion_r74700122
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java ---
    @@ -0,0 +1,69 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.util.serialization;
    +
    +import com.fasterxml.jackson.databind.JsonNode;
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import com.fasterxml.jackson.databind.node.ObjectNode;
    +import org.apache.flink.api.table.Row;
    +import org.apache.flink.util.Preconditions;
    +
    +
    +/**
    + * Serialization schema that serializes an object into a JSON bytes.
    + *
    + * <p>Serializes the input {@link Row} object into a JSON string and
    + * converts it into <code>byte[]</code>.
    + *
    + * <p>Result <bode>byte[]</bode> messages can be deserialized using
    + * {@link JsonRowDeserializationSchema}.
    + */
    +public class JsonRowSerializationSchema implements SerializationSchema<Row> {
    +	/** Fields names in the input Row object */
    +	private final String[] fieldNames;
    +	/** Object mapper that is used to create output JSON objects */
    +	private static ObjectMapper mapper = new ObjectMapper();
    +
    +	/**
    +	 * Creates a JSON serialization schema for the given fields and types.
    +	 *
    +	 * @param fieldNames Names of JSON fields to parse.
    +	 */
    +	public JsonRowSerializationSchema(String[] fieldNames) {
    +		this.fieldNames = Preconditions.checkNotNull(fieldNames);
    +	}
    +
    +	@Override
    +	public byte[] serialize(Row row) {
    +		if (row.productArity() != fieldNames.length) {
    +			throw new IllegalStateException(String.format(
    +				"Number of elements in the row %s is different from number of field names: %d", row, fieldNames.length));
    +		}
    +
    +		ObjectNode objectNode = mapper.createObjectNode();
    +		for (int i = 0; i < row.productArity(); i++) {
    +			JsonNode node = mapper.valueToTree(row.productElement(i));
    +			objectNode.set(fieldNames[i], node);
    +		}
    +
    +		try {
    +			return mapper.writeValueAsBytes(objectNode);
    +		} catch (Exception e) {
    +			throw new RuntimeException("Failed to serialize row", e);
    --- End diff --
    
    BTW, it seems that change the interface will not break japicmp check. Maybe we can change it , I'm not sure.   
    
    @twalthr @aljoscha what do you think? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2244: [FLINK-3874] Add a Kafka TableSink with JSON seria...

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2244#discussion_r74698003
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java ---
    @@ -0,0 +1,69 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.util.serialization;
    +
    +import com.fasterxml.jackson.databind.JsonNode;
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import com.fasterxml.jackson.databind.node.ObjectNode;
    +import org.apache.flink.api.table.Row;
    +import org.apache.flink.util.Preconditions;
    +
    +
    +/**
    + * Serialization schema that serializes an object into a JSON bytes.
    + *
    + * <p>Serializes the input {@link Row} object into a JSON string and
    + * converts it into <code>byte[]</code>.
    + *
    + * <p>Result <bode>byte[]</bode> messages can be deserialized using
    + * {@link JsonRowDeserializationSchema}.
    + */
    +public class JsonRowSerializationSchema implements SerializationSchema<Row> {
    +	/** Fields names in the input Row object */
    +	private final String[] fieldNames;
    +	/** Object mapper that is used to create output JSON objects */
    +	private static ObjectMapper mapper = new ObjectMapper();
    +
    +	/**
    +	 * Creates a JSON serialization schema for the given fields and types.
    +	 *
    +	 * @param fieldNames Names of JSON fields to parse.
    +	 */
    +	public JsonRowSerializationSchema(String[] fieldNames) {
    +		this.fieldNames = Preconditions.checkNotNull(fieldNames);
    +	}
    +
    +	@Override
    +	public byte[] serialize(Row row) {
    +		if (row.productArity() != fieldNames.length) {
    +			throw new IllegalStateException(String.format(
    +				"Number of elements in the row %s is different from number of field names: %d", row, fieldNames.length));
    +		}
    +
    +		ObjectNode objectNode = mapper.createObjectNode();
    +		for (int i = 0; i < row.productArity(); i++) {
    +			JsonNode node = mapper.valueToTree(row.productElement(i));
    +			objectNode.set(fieldNames[i], node);
    +		}
    +
    +		try {
    +			return mapper.writeValueAsBytes(objectNode);
    +		} catch (Exception e) {
    +			throw new RuntimeException("Failed to serialize row", e);
    --- End diff --
    
    I think it would be better, but I implement **SerializationSchema** interface here and it declare that it throws any exceptions. I could change the interface, but it is marked as Public so I am not sure if this is a good idea.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2244: [FLINK-3874] Add a Kafka TableSink with JSON seria...

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2244#discussion_r74633765
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/sink/CsvTableSinkTest.scala ---
    @@ -0,0 +1,79 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.table.sink
    +
    +import java.io.File
    +import java.nio.charset.Charset
    +import java.nio.file.Files
    +import java.util.Collections
    +
    +import org.apache.flink.api.java.ExecutionEnvironment
    +import org.apache.flink.api.java.operators.DataSource
    +import org.apache.flink.api.table.Row
    +import org.apache.flink.api.table.sinks.CsvTableSink
    +import org.junit.rules.TemporaryFolder
    +import org.junit.{Rule, Test}
    +import org.junit.Assert._
    +
    +class CsvTableSinkTest {
    --- End diff --
    
    Renamed tests to follow the common testXXX() format


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2244: [FLINK-3874] Add a Kafka TableSink with JSON seria...

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2244#discussion_r74701669
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/TableSinkBase.scala ---
    @@ -0,0 +1,68 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.table.sinks
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +
    +trait TableSinkBase[T] extends TableSink[T] {
    --- End diff --
    
    Do you mean that a better way could be to make TableSink abstract instead of extracting methods out of it? It's a good idea, but I wanted to keep TableSink similar to TableSource (being a trait with no method implementations).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2244: [FLINK-3874] Add a Kafka TableSink with JSON serializatio...

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on the issue:

    https://github.com/apache/flink/pull/2244
  
    @wuchong @twalthr I've updated the PR according to your review. Could you please review it one more time?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2244: [FLINK-3874] Add a Kafka TableSink with JSON seria...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2244#discussion_r75835433
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java ---
    @@ -0,0 +1,46 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.connectors.kafka;
    +
    +import org.apache.flink.api.table.Row;
    +import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
    +import org.apache.flink.streaming.util.serialization.SerializationSchema;
    +
    +import java.util.Properties;
    +
    +/**
    + * Kafka 0.8 {@link KafkaTableSink} that serializes data in JSON format.
    + */
    +public class Kafka08JsonTableSink extends KafkaJsonTableSink {
    +	/**
    +	 * Creates {@link KafkaTableSink} for Kafka 0.8
    +	 *
    +	 * @param topic topic in Kafka
    +	 * @param properties properties to connect to Kafka
    +	 * @param partitioner Kafka partitioner
    +	 */
    +	public Kafka08JsonTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner) {
    +		super(topic, properties, partitioner);
    +	}
    +
    +	@Override
    +	protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties, SerializationSchema<Row> serializationSchema, KafkaPartitioner<Row> partitioner) {
    +		return new FlinkKafkaProducer08<Row>(topic,serializationSchema, properties, partitioner);
    --- End diff --
    
    add a space after `String topic,`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2244: [FLINK-3874] Add a Kafka TableSink with JSON seria...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2244#discussion_r70800080
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AvroSerializationSchemaTest.java ---
    @@ -0,0 +1,114 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.connectors.kafka;
    +
    +import org.apache.avro.Schema;
    +import org.apache.avro.file.DataFileStream;
    +import org.apache.avro.io.DatumWriter;
    +import org.apache.avro.io.Encoder;
    +import org.apache.avro.io.EncoderFactory;
    +import org.apache.avro.reflect.ReflectData;
    +import org.apache.avro.reflect.ReflectDatumReader;
    +import org.apache.avro.reflect.ReflectDatumWriter;
    +import org.apache.flink.streaming.util.serialization.AvroSerializationSchema;
    +import org.junit.Test;
    +
    +import java.io.ByteArrayInputStream;
    +import java.io.ByteArrayOutputStream;
    +import java.io.IOException;
    +import java.util.Objects;
    +
    +import static javafx.scene.input.KeyCode.T;
    --- End diff --
    
    What's the javafx import for?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2244: [FLINK-3874] Add a Kafka TableSink with JSON seria...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2244#discussion_r75836644
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java ---
    @@ -0,0 +1,69 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.util.serialization;
    +
    +import com.fasterxml.jackson.databind.JsonNode;
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import com.fasterxml.jackson.databind.node.ObjectNode;
    +import org.apache.flink.api.table.Row;
    +import org.apache.flink.util.Preconditions;
    +
    +
    +/**
    + * Serialization schema that serializes an object into a JSON bytes.
    + *
    + * <p>Serializes the input {@link Row} object into a JSON string and
    + * converts it into <code>byte[]</code>.
    + *
    + * <p>Result <bode>byte[]</bode> messages can be deserialized using
    + * {@link JsonRowDeserializationSchema}.
    + */
    +public class JsonRowSerializationSchema implements SerializationSchema<Row> {
    +	/** Fields names in the input Row object */
    +	private final String[] fieldNames;
    +	/** Object mapper that is used to create output JSON objects */
    +	private static ObjectMapper mapper = new ObjectMapper();
    +
    +	/**
    +	 * Creates a JSON serialization schema for the given fields and types.
    +	 *
    +	 * @param fieldNames Names of JSON fields to parse.
    +	 */
    +	public JsonRowSerializationSchema(String[] fieldNames) {
    +		this.fieldNames = Preconditions.checkNotNull(fieldNames);
    +	}
    +
    +	@Override
    +	public byte[] serialize(Row row) {
    +		if (row.productArity() != fieldNames.length) {
    +			throw new IllegalStateException(String.format(
    +				"Number of elements in the row %s is different from number of field names: %d", row, fieldNames.length));
    +		}
    +
    +		ObjectNode objectNode = mapper.createObjectNode();
    +		for (int i = 0; i < row.productArity(); i++) {
    +			JsonNode node = mapper.valueToTree(row.productElement(i));
    +			objectNode.set(fieldNames[i], node);
    +		}
    +
    +		try {
    +			return mapper.writeValueAsBytes(objectNode);
    +		} catch (Exception e) {
    +			throw new RuntimeException("Failed to serialize row", e);
    --- End diff --
    
    I don't think we can change the interface. 
    @wuchong, if japicmp does not complain we should check what is going wrong here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2244: [FLINK-3874] Add a Kafka TableSink with JSON seria...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2244#discussion_r75835841
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java ---
    @@ -0,0 +1,45 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.connectors.kafka;
    +
    +import org.apache.flink.api.table.Row;
    +import org.apache.flink.streaming.util.serialization.DeserializationSchema;
    +import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
    +import org.junit.Test;
    +
    +public class Kafka09JsonTableSinkTest extends KafkaTableSinkTestBase {
    +	@Test
    +	public void kafka09JsonTableSinkTest() throws Exception {
    --- End diff --
    
    If we add a `@Test` annotation to `KafkaTableSinkTestBase.testKafkaTableSink()` we do not need this method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2244: [FLINK-3874] Add a Kafka TableSink with JSON seria...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2244#discussion_r75835251
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/TableSink.scala ---
    @@ -41,46 +38,13 @@ trait TableSink[T] {
         */
       def getOutputType: TypeInformation[T]
     
    -  /** Return a deep copy of the [[TableSink]]. */
    -  protected def copy: TableSink[T]
    -
    -  /**
    -    * Return the field names of the [[org.apache.flink.api.table.Table]] to emit. */
    -  protected final def getFieldNames: Array[String] = {
    -    fieldNames match {
    -      case Some(n) => n
    -      case None => throw new IllegalStateException(
    -        "TableSink must be configured to retrieve field names.")
    -    }
    -  }
    -
    -  /** Return the field types of the [[org.apache.flink.api.table.Table]] to emit. */
    -  protected final def getFieldTypes: Array[TypeInformation[_]] = {
    -    fieldTypes match {
    -      case Some(t) => t
    -      case None => throw new IllegalStateException(
    -        "TableSink must be configured to retrieve field types.")
    -    }
    -  }
    -
    -  /**
    -    * Return a copy of this [[TableSink]] configured with the field names and types of the
    --- End diff --
    
    Please keep the documentation for `configure()`. It is crucial that `configure()` returns a copy and not the same object.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2244: [FLINK-3874] Add a Kafka TableSink with JSON serializatio...

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on the issue:

    https://github.com/apache/flink/pull/2244
  
    Build is now passing. Could someone please review this PR?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2244: [FLINK-3874] Add a Kafka TableSink with JSON seria...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2244#discussion_r75835356
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala ---
    @@ -26,6 +26,8 @@ import org.apache.flink.api.table.plan.logical._
     import org.apache.flink.api.table.sinks.TableSink
     
     import scala.collection.JavaConverters._
    +import org.apache.flink.api.table.sinks.{TableSink, TableSinkBase}
    --- End diff --
    
    Can these imports be removed?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2244: [FLINK-3874] Add a Kafka TableSink with JSON seria...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2244#discussion_r75836302
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java ---
    @@ -0,0 +1,69 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.util.serialization;
    +
    +import com.fasterxml.jackson.databind.JsonNode;
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import com.fasterxml.jackson.databind.node.ObjectNode;
    +import org.apache.flink.api.table.Row;
    +import org.apache.flink.util.Preconditions;
    +
    +
    +/**
    + * Serialization schema that serializes an object into a JSON bytes.
    + *
    + * <p>Serializes the input {@link Row} object into a JSON string and
    + * converts it into <code>byte[]</code>.
    + *
    + * <p>Result <bode>byte[]</bode> messages can be deserialized using
    + * {@link JsonRowDeserializationSchema}.
    + */
    +public class JsonRowSerializationSchema implements SerializationSchema<Row> {
    +	/** Fields names in the input Row object */
    +	private final String[] fieldNames;
    +	/** Object mapper that is used to create output JSON objects */
    +	private static ObjectMapper mapper = new ObjectMapper();
    +
    +	/**
    +	 * Creates a JSON serialization schema for the given fields and types.
    +	 *
    +	 * @param fieldNames Names of JSON fields to parse.
    +	 */
    +	public JsonRowSerializationSchema(String[] fieldNames) {
    +		this.fieldNames = Preconditions.checkNotNull(fieldNames);
    +	}
    +
    +	@Override
    +	public byte[] serialize(Row row) {
    +		if (row.productArity() != fieldNames.length) {
    +			throw new IllegalStateException(String.format(
    +				"Number of elements in the row %s is different from number of field names: %d", row, fieldNames.length));
    +		}
    +
    +		ObjectNode objectNode = mapper.createObjectNode();
    --- End diff --
    
    Is it possible to reuse the `ObjectNode`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2244: [FLINK-3874] Add a Kafka TableSink with JSON seria...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2244#discussion_r74391066
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java ---
    @@ -0,0 +1,52 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.util.serialization;
    +
    +import com.fasterxml.jackson.databind.JsonNode;
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import com.fasterxml.jackson.databind.node.ObjectNode;
    +import org.apache.flink.api.table.Row;
    +
    +
    +/**
    + * Serialization schema that serializes an object into a JSON bytes.
    + * <p>
    --- End diff --
    
    Unnecessary `<p>`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2244: [FLINK-3874] Add a Kafka TableSink with JSON serializatio...

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on the issue:

    https://github.com/apache/flink/pull/2244
  
    @twalthr @wuchong 
    Could you please review one more time? This PR is blocking another similar PR.
    The test failure does not seem to have anything to do with the new code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2244: [FLINK-3874] Add a Kafka TableSink with JSON seria...

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2244#discussion_r74701671
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala ---
    @@ -590,7 +592,7 @@ class Table(
           .map(field => FlinkTypeFactory.toTypeInfo(field.getType)).toArray
     
         // configure the table sink
    -    val configuredSink = sink.configure(fieldNames, fieldTypes)
    +    val configuredSink = sink.asInstanceOf[TableSinkBase[T]].configure(fieldNames, fieldTypes)
    --- End diff --
    
    Good point. Fixed it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2244: [FLINK-3874] Add a Kafka TableSink with JSON seria...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/2244


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2244: [FLINK-3874] Add a Kafka TableSink with JSON seria...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2244#discussion_r74699836
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java ---
    @@ -0,0 +1,69 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.util.serialization;
    +
    +import com.fasterxml.jackson.databind.JsonNode;
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import com.fasterxml.jackson.databind.node.ObjectNode;
    +import org.apache.flink.api.table.Row;
    +import org.apache.flink.util.Preconditions;
    +
    +
    +/**
    + * Serialization schema that serializes an object into a JSON bytes.
    + *
    + * <p>Serializes the input {@link Row} object into a JSON string and
    + * converts it into <code>byte[]</code>.
    + *
    + * <p>Result <bode>byte[]</bode> messages can be deserialized using
    + * {@link JsonRowDeserializationSchema}.
    + */
    +public class JsonRowSerializationSchema implements SerializationSchema<Row> {
    +	/** Fields names in the input Row object */
    +	private final String[] fieldNames;
    +	/** Object mapper that is used to create output JSON objects */
    +	private static ObjectMapper mapper = new ObjectMapper();
    --- End diff --
    
    OK, I see. LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2244: [FLINK-3874] Add a Kafka TableSink with JSON serializatio...

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on the issue:

    https://github.com/apache/flink/pull/2244
  
    I removed fieldNames and fieldTypes from the constructor arguments and rely on **configure** method. Because of that I had to add an additional method for create serialization scheme in sub classes, since field names and field types are unknown at the moment of an object creation.
    
    @wuchong @twalthr Could you please review the PR again?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2244: [FLINK-3874] Add a Kafka TableSink with JSON seria...

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2244#discussion_r75938654
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java ---
    @@ -0,0 +1,46 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.connectors.kafka;
    +
    +import org.apache.flink.api.table.Row;
    +import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
    +import org.apache.flink.streaming.util.serialization.SerializationSchema;
    +
    +import java.util.Properties;
    +
    +/**
    + * Kafka 0.8 {@link KafkaTableSink} that serializes data in JSON format.
    + */
    +public class Kafka08JsonTableSink extends KafkaJsonTableSink {
    +	/**
    +	 * Creates {@link KafkaTableSink} for Kafka 0.8
    +	 *
    +	 * @param topic topic in Kafka
    --- End diff --
    
    Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2244: [FLINK-3874] Add a Kafka TableSink with JSON seria...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2244#discussion_r74389977
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java ---
    @@ -0,0 +1,120 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.connectors.kafka;
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.table.Row;
    +import org.apache.flink.api.table.sinks.StreamTableSink;
    +import org.apache.flink.api.table.typeutils.RowTypeInfo;
    +import org.apache.flink.streaming.api.datastream.DataStream;
    +import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
    +import org.apache.flink.streaming.util.serialization.SerializationSchema;
    +import org.apache.flink.util.Preconditions;
    +
    +import java.util.Properties;
    +
    +import static org.apache.flink.streaming.util.TypeUtil.toTypeInfo;
    +
    +/**
    + * A version-agnostic Kafka {@link StreamTableSink}.
    + *
    + * <p>The version-specific Kafka consumers need to extend this class and
    + * override {@link #createKafkaProducer(String, Properties, SerializationSchema, KafkaPartitioner)}}.
    + */
    +public abstract class KafkaTableSink implements StreamTableSink<Row> {
    +
    +	private final String topic;
    +	private final Properties properties;
    +	private final SerializationSchema<Row> serializationSchema;
    +	private final KafkaPartitioner<Row> partitioner;
    +	private final String[] fieldNames;
    +	private final TypeInformation[] fieldTypes;
    +
    +	/**
    +	 * Creates KafkaTableSink
    +	 *
    +	 * @param topic                 Kafka topic to consume.
    +	 * @param properties            Properties for the Kafka consumer.
    +	 * @param serializationSchema   Serialization schema for emitted items
    +	 * @param partitioner           Partitioner to select Kafka partition for each item
    +	 * @param fieldNames            Row field names.
    +	 * @param fieldTypes            Row field types.
    +	 */
    +	public KafkaTableSink(
    +			String topic,
    +			Properties properties,
    +			SerializationSchema<Row> serializationSchema,
    +			KafkaPartitioner<Row> partitioner,
    +			String[] fieldNames,
    +			Class<?>[] fieldTypes) {
    +		this(topic, properties, serializationSchema, partitioner, fieldNames, toTypeInfo(fieldTypes));
    +	}
    +
    +	/**
    +	 * Creates KafkaTableSink
    +	 *
    +	 * @param topic                 Kafka topic to consume.
    +	 * @param properties            Properties for the Kafka consumer.
    +	 * @param serializationSchema   Serialization schema for emitted items
    +	 * @param partitioner           Partitioner to select Kafka partition for each item
    +	 * @param fieldNames            Row field names.
    +	 * @param fieldTypes            Row field types.
    +	 */
    +	public KafkaTableSink(
    +			String topic,
    +			Properties properties,
    +			SerializationSchema<Row> serializationSchema,
    +			KafkaPartitioner<Row> partitioner,
    +			String[] fieldNames,
    +			TypeInformation<?>[] fieldTypes) {
    +
    +		this.topic = Preconditions.checkNotNull(topic, "topic");
    +		this.properties = Preconditions.checkNotNull(properties, "properties");
    +		this.serializationSchema = Preconditions.checkNotNull(serializationSchema, "serializationSchema");
    +		this.partitioner = Preconditions.checkNotNull(partitioner, "partitioner");
    +		this.fieldNames = Preconditions.checkNotNull(fieldNames, "fieldNames");
    +		this.fieldTypes = Preconditions.checkNotNull(fieldTypes);
    +		Preconditions.checkArgument(fieldNames.length == fieldTypes.length,
    +			"Number of provided field names and types does not match.");
    +	}
    +
    +	@Override
    +	public void emitDataStream(DataStream<Row> dataStream) {
    +		FlinkKafkaProducerBase<Row> kafkaProducer = createKafkaProducer(topic, properties, serializationSchema, partitioner);
    +		dataStream.addSink(kafkaProducer);
    +	}
    +
    +	abstract protected FlinkKafkaProducerBase<Row> createKafkaProducer(
    --- End diff --
    
    I would put this method after the constructor and add a Javadoc for it. In general, the visibility should be in font of the `abstract` modifier.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2244: [FLINK-3874] Add a Kafka TableSink with JSON seria...

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2244#discussion_r75938617
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala ---
    @@ -26,6 +26,8 @@ import org.apache.flink.api.table.plan.logical._
     import org.apache.flink.api.table.sinks.TableSink
     
     import scala.collection.JavaConverters._
    +import org.apache.flink.api.table.sinks.{TableSink, TableSinkBase}
    --- End diff --
    
    Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2244: [FLINK-3874] Add a Kafka TableSink with JSON seria...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2244#discussion_r76010171
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java ---
    @@ -35,19 +35,29 @@ public void testRowSerialization() throws IOException {
     		row.setField(1, true);
     		row.setField(2, "str");
     
    +		Row resultRow = serializeAndDeserialize(fieldNames, fieldTypes, row);
    +		assertEqualRows(row, resultRow);
    +	}
     
    -		JsonRowSerializationSchema serializationSchema = new JsonRowSerializationSchema(fieldNames);
    -		JsonRowDeserializationSchema deserializationSchema = new JsonRowDeserializationSchema(fieldNames, fieldTypes);
    +	@Test
    +	public void testSerializationOfTwoRows() throws IOException {
    --- End diff --
    
    Actually, the test I wanted to propose should serialize two rows (with same schema) using the same `JsonRowSerializationSchema` and `JsonRowDeserializationSchema` to make sure that there are no side effects. This is also what happens in the table source.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2244: [FLINK-3874] Add a Kafka TableSink with JSON seria...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2244#discussion_r70800002
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AvroSerializationSchemaTest.java ---
    @@ -0,0 +1,114 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.connectors.kafka;
    +
    +import org.apache.avro.Schema;
    +import org.apache.avro.file.DataFileStream;
    +import org.apache.avro.io.DatumWriter;
    +import org.apache.avro.io.Encoder;
    +import org.apache.avro.io.EncoderFactory;
    +import org.apache.avro.reflect.ReflectData;
    +import org.apache.avro.reflect.ReflectDatumReader;
    +import org.apache.avro.reflect.ReflectDatumWriter;
    +import org.apache.flink.streaming.util.serialization.AvroSerializationSchema;
    +import org.junit.Test;
    +
    +import java.io.ByteArrayInputStream;
    +import java.io.ByteArrayOutputStream;
    +import java.io.IOException;
    +import java.util.Objects;
    +
    +import static javafx.scene.input.KeyCode.T;
    +import static org.junit.Assert.assertEquals;
    +
    +public class AvroSerializationSchemaTest {
    +//	@Test
    --- End diff --
    
    Why is this commented out?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2244: [FLINK-3874] Add a Kafka TableSink with JSON seria...

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2244#discussion_r74705587
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala ---
    @@ -592,7 +592,7 @@ class Table(
           .map(field => FlinkTypeFactory.toTypeInfo(field.getType)).toArray
     
         // configure the table sink
    -    val configuredSink = sink.asInstanceOf[TableSinkBase[T]].configure(fieldNames, fieldTypes)
    +    val configuredSink = sink.asInstanceOf[TableSink[T]].configure(fieldNames, fieldTypes)
    --- End diff --
    
    Right. Removed it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2244: [FLINK-3874] Add a Kafka TableSink with JSON seria...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2244#discussion_r74697524
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java ---
    @@ -0,0 +1,69 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.util.serialization;
    +
    +import com.fasterxml.jackson.databind.JsonNode;
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import com.fasterxml.jackson.databind.node.ObjectNode;
    +import org.apache.flink.api.table.Row;
    +import org.apache.flink.util.Preconditions;
    +
    +
    +/**
    + * Serialization schema that serializes an object into a JSON bytes.
    + *
    + * <p>Serializes the input {@link Row} object into a JSON string and
    + * converts it into <code>byte[]</code>.
    + *
    + * <p>Result <bode>byte[]</bode> messages can be deserialized using
    + * {@link JsonRowDeserializationSchema}.
    + */
    +public class JsonRowSerializationSchema implements SerializationSchema<Row> {
    +	/** Fields names in the input Row object */
    +	private final String[] fieldNames;
    +	/** Object mapper that is used to create output JSON objects */
    +	private static ObjectMapper mapper = new ObjectMapper();
    --- End diff --
    
    Is this thread safe?  I think the ObjectMapper should be non-static.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2244: [FLINK-3874] Add a Kafka TableSink with JSON seria...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2244#discussion_r74699827
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java ---
    @@ -0,0 +1,69 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.util.serialization;
    +
    +import com.fasterxml.jackson.databind.JsonNode;
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import com.fasterxml.jackson.databind.node.ObjectNode;
    +import org.apache.flink.api.table.Row;
    +import org.apache.flink.util.Preconditions;
    +
    +
    +/**
    + * Serialization schema that serializes an object into a JSON bytes.
    + *
    + * <p>Serializes the input {@link Row} object into a JSON string and
    + * converts it into <code>byte[]</code>.
    + *
    + * <p>Result <bode>byte[]</bode> messages can be deserialized using
    + * {@link JsonRowDeserializationSchema}.
    + */
    +public class JsonRowSerializationSchema implements SerializationSchema<Row> {
    +	/** Fields names in the input Row object */
    +	private final String[] fieldNames;
    +	/** Object mapper that is used to create output JSON objects */
    +	private static ObjectMapper mapper = new ObjectMapper();
    +
    +	/**
    +	 * Creates a JSON serialization schema for the given fields and types.
    +	 *
    +	 * @param fieldNames Names of JSON fields to parse.
    +	 */
    +	public JsonRowSerializationSchema(String[] fieldNames) {
    +		this.fieldNames = Preconditions.checkNotNull(fieldNames);
    +	}
    +
    +	@Override
    +	public byte[] serialize(Row row) {
    +		if (row.productArity() != fieldNames.length) {
    +			throw new IllegalStateException(String.format(
    +				"Number of elements in the row %s is different from number of field names: %d", row, fieldNames.length));
    +		}
    +
    +		ObjectNode objectNode = mapper.createObjectNode();
    +		for (int i = 0; i < row.productArity(); i++) {
    +			JsonNode node = mapper.valueToTree(row.productElement(i));
    +			objectNode.set(fieldNames[i], node);
    +		}
    +
    +		try {
    +			return mapper.writeValueAsBytes(objectNode);
    +		} catch (Exception e) {
    +			throw new RuntimeException("Failed to serialize row", e);
    --- End diff --
    
    Oh, you are right. It looks good to me now. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2244: [FLINK-3874] Add a Kafka TableSink with JSON seria...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2244#discussion_r74388296
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java ---
    @@ -0,0 +1,62 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.connectors.kafka;
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.table.Row;
    +import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
    +import org.apache.flink.streaming.util.serialization.SerializationSchema;
    +
    +import java.util.Properties;
    +
    +/**
    + * Kafka 0.8 {@link KafkaTableSink} that serializes data in JSON format.
    + */
    +public class Kafka08JsonTableSink extends KafkaJsonTableSink {
    +	/**
    +	 * Creates {@link KafkaTableSink} for Kafka 0.8
    +	 *
    +	 * @param topic topic in Kafka
    +	 * @param properties properties to connect to Kafka
    +	 * @param partitioner Kafra partitioner
    +	 * @param fieldNames row field names
    +	 * @param fieldTypes row field types
    +	 */
    +	public Kafka08JsonTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner, String[] fieldNames, Class<?>[] fieldTypes) {
    +		super(topic, properties, partitioner, fieldNames, fieldTypes);
    +	}
    +
    +	/**
    +	 * Creates {@link KafkaTableSink} for Kafka 0.9
    --- End diff --
    
    Should be 0.8


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2244: [FLINK-3874] Add a Kafka TableSink with JSON serializatio...

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on the issue:

    https://github.com/apache/flink/pull/2244
  
    I've update the PR according to the PR and fixed the build (I was using a method from JDK8).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2244: [FLINK-3874] Add a Kafka TableSink with JSON serializatio...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/2244
  
    Hi @mushketyk, thanks for the fast reply. 
    I saw your comments but no new commit. Did you forget to push to your PR branch?
    
    Thanks, Fabian


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2244: [FLINK-3874] Add a Kafka TableSink with JSON seria...

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2244#discussion_r75938937
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java ---
    @@ -0,0 +1,45 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.connectors.kafka;
    +
    +import org.apache.flink.api.table.Row;
    +import org.apache.flink.streaming.util.serialization.DeserializationSchema;
    +import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
    +import org.junit.Test;
    +
    +public class Kafka09JsonTableSinkTest extends KafkaTableSinkTestBase {
    +	@Test
    +	public void kafka09JsonTableSinkTest() throws Exception {
    --- End diff --
    
    Removed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2244: [FLINK-3874] Add a Kafka TableSink with JSON serializatio...

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on the issue:

    https://github.com/apache/flink/pull/2244
  
    @twalthr Thank you for your detailed review. I am very new to Apache Flink at the moment so I made some absurd changes. I'll try to avoid this in the future.
    
    I'll update the PR according to your comments in a day or two.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2244: [FLINK-3874] Add a Kafka TableSink with JSON seria...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2244#discussion_r75836176
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java ---
    @@ -0,0 +1,110 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.connectors.kafka;
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.table.Row;
    +import org.apache.flink.api.table.sinks.StreamTableSink;
    +import org.apache.flink.api.table.typeutils.RowTypeInfo;
    +import org.apache.flink.streaming.api.datastream.DataStream;
    +import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
    +import org.apache.flink.streaming.util.serialization.SerializationSchema;
    +import org.apache.flink.util.Preconditions;
    +
    +import java.util.Properties;
    +
    +/**
    + * A version-agnostic Kafka {@link StreamTableSink}.
    + *
    + * <p>The version-specific Kafka consumers need to extend this class and
    + * override {@link #createKafkaProducer(String, Properties, SerializationSchema, KafkaPartitioner)}}.
    + */
    +public abstract class KafkaTableSink implements StreamTableSink<Row> {
    +
    +	private final String topic;
    +	private final Properties properties;
    +	private SerializationSchema<Row> serializationSchema;
    +	private final KafkaPartitioner<Row> partitioner;
    +	private String[] fieldNames;
    +	private TypeInformation[] fieldTypes;
    +	/**
    +	 * Creates KafkaTableSink
    +	 *
    +	 * @param topic                 Kafka topic to consume.
    +	 * @param properties            Properties for the Kafka consumer.
    +	 * @param partitioner           Partitioner to select Kafka partition for each item
    +	 */
    +	public KafkaTableSink(
    +			String topic,
    +			Properties properties,
    +			KafkaPartitioner<Row> partitioner) {
    +
    +		this.topic = Preconditions.checkNotNull(topic, "topic");
    +		this.properties = Preconditions.checkNotNull(properties, "properties");
    +		this.partitioner = Preconditions.checkNotNull(partitioner, "partitioner");
    +	}
    +
    +	/**
    +	 * Returns the version-specifid Kafka producer.
    +	 *
    +	 * @param topic               Kafka topic to produce to.
    +	 * @param properties          Properties for the Kafka producer.
    +	 * @param serializationSchema Serialization schema to use to create Kafka records.
    +	 * @param partitioner         Partitioner to select Kafka partition.
    +	 * @return The version-specific Kafka producer
    +	 */
    +	protected abstract FlinkKafkaProducerBase<Row> createKafkaProducer(
    +		String topic, Properties properties,
    +		SerializationSchema<Row> serializationSchema,
    +		KafkaPartitioner<Row> partitioner);
    +
    +	protected abstract SerializationSchema<Row> createSerializationSchema(String[] fieldNames);
    +
    +	@Override
    +	public void emitDataStream(DataStream<Row> dataStream) {
    +		FlinkKafkaProducerBase<Row> kafkaProducer = createKafkaProducer(topic, properties, serializationSchema, partitioner);
    +		dataStream.addSink(kafkaProducer);
    +	}
    +
    +	@Override
    +	public TypeInformation<Row> getOutputType() {
    +		return new RowTypeInfo(getFieldTypes());
    +	}
    +
    +	public String[] getFieldNames() {
    +		return fieldNames;
    +	}
    +
    +	@Override
    +	public TypeInformation<?>[] getFieldTypes() {
    +		return fieldTypes;
    +	}
    +
    +	@Override
    +	public KafkaTableSink configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
    +		this.fieldNames = Preconditions.checkNotNull(fieldNames, "fieldNames");
    +		this.fieldTypes = Preconditions.checkNotNull(fieldTypes, "fieldTypes");
    +		Preconditions.checkArgument(fieldNames.length == fieldTypes.length,
    +			"Number of provided field names and types does not match.");
    +		this.serializationSchema = createSerializationSchema(fieldNames);
    +
    +		return this;
    --- End diff --
    
    `configure()` should return a copy of this object.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2244: [FLINK-3874] Add a Kafka TableSink with JSON seria...

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2244#discussion_r75938899
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java ---
    @@ -0,0 +1,46 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.connectors.kafka;
    +
    +import org.apache.flink.api.table.Row;
    +import org.apache.flink.streaming.util.serialization.DeserializationSchema;
    +import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
    +import org.junit.Test;
    +
    +public class Kafka08JsonTableSinkTest extends KafkaTableSinkTestBase {
    +	@Test
    +	public void kafka08JsonTableSinkTest() throws Exception {
    --- End diff --
    
    Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2244: [FLINK-3874] Add a Kafka TableSink with JSON seria...

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2244#discussion_r75938989
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java ---
    @@ -0,0 +1,68 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.connectors.kafka;
    +
    +import org.apache.flink.api.table.Row;
    +import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
    +import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
    +import org.junit.Test;
    +
    +import java.io.IOException;
    +
    +import static org.junit.Assert.assertEquals;
    +
    +public class JsonRowSerializationSchemaTest {
    +	@Test
    +	public void testRowSerialization() throws IOException {
    +		String[] fieldNames = new String[] {"f1", "f2", "f3"};
    +		Class[] fieldTypes = new Class[] {Integer.class, Boolean.class, String.class};
    +		Row row = new Row(3);
    --- End diff --
    
    Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2244: [FLINK-3874] Add a Kafka TableSink with JSON seria...

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2244#discussion_r75938972
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java ---
    @@ -0,0 +1,69 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.util.serialization;
    +
    +import com.fasterxml.jackson.databind.JsonNode;
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import com.fasterxml.jackson.databind.node.ObjectNode;
    +import org.apache.flink.api.table.Row;
    +import org.apache.flink.util.Preconditions;
    +
    +
    +/**
    + * Serialization schema that serializes an object into a JSON bytes.
    + *
    + * <p>Serializes the input {@link Row} object into a JSON string and
    + * converts it into <code>byte[]</code>.
    + *
    + * <p>Result <bode>byte[]</bode> messages can be deserialized using
    + * {@link JsonRowDeserializationSchema}.
    + */
    +public class JsonRowSerializationSchema implements SerializationSchema<Row> {
    +	/** Fields names in the input Row object */
    +	private final String[] fieldNames;
    +	/** Object mapper that is used to create output JSON objects */
    +	private static ObjectMapper mapper = new ObjectMapper();
    +
    +	/**
    +	 * Creates a JSON serialization schema for the given fields and types.
    +	 *
    +	 * @param fieldNames Names of JSON fields to parse.
    +	 */
    +	public JsonRowSerializationSchema(String[] fieldNames) {
    +		this.fieldNames = Preconditions.checkNotNull(fieldNames);
    +	}
    +
    +	@Override
    +	public byte[] serialize(Row row) {
    +		if (row.productArity() != fieldNames.length) {
    +			throw new IllegalStateException(String.format(
    +				"Number of elements in the row %s is different from number of field names: %d", row, fieldNames.length));
    +		}
    +
    +		ObjectNode objectNode = mapper.createObjectNode();
    --- End diff --
    
    Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2244: [FLINK-3874] Add a Kafka TableSink with JSON seria...

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2244#discussion_r75938632
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java ---
    @@ -0,0 +1,46 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.connectors.kafka;
    +
    +import org.apache.flink.api.table.Row;
    +import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
    +import org.apache.flink.streaming.util.serialization.SerializationSchema;
    +
    +import java.util.Properties;
    +
    +/**
    + * Kafka 0.8 {@link KafkaTableSink} that serializes data in JSON format.
    + */
    +public class Kafka08JsonTableSink extends KafkaJsonTableSink {
    +	/**
    +	 * Creates {@link KafkaTableSink} for Kafka 0.8
    +	 *
    +	 * @param topic topic in Kafka
    +	 * @param properties properties to connect to Kafka
    +	 * @param partitioner Kafka partitioner
    +	 */
    +	public Kafka08JsonTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner) {
    +		super(topic, properties, partitioner);
    +	}
    +
    +	@Override
    +	protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties, SerializationSchema<Row> serializationSchema, KafkaPartitioner<Row> partitioner) {
    +		return new FlinkKafkaProducer08<Row>(topic,serializationSchema, properties, partitioner);
    --- End diff --
    
    Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2244: [FLINK-3874] Add a Kafka TableSink with JSON seria...

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2244#discussion_r74634369
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchemaWrapper.java ---
    @@ -16,12 +16,14 @@
      */
     package org.apache.flink.streaming.util.serialization;
     
    +import java.io.Serializable;
    +
     /**
      * A simple wrapper for using the SerializationSchema with the KeyedDeserializationSchema
      * interface
      * @param <T> The type to serialize
      */
    -public class KeyedSerializationSchemaWrapper<T> implements KeyedSerializationSchema<T> {
    +public class KeyedSerializationSchemaWrapper<T> implements KeyedSerializationSchema<T>, Serializable {
    --- End diff --
    
    Fixed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2244: [FLINK-3874] Add a Kafka TableSink with JSON serializatio...

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on the issue:

    https://github.com/apache/flink/pull/2244
  
    Fixed build.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2244: [FLINK-3874] Add a Kafka TableSink with JSON serializatio...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on the issue:

    https://github.com/apache/flink/pull/2244
  
    @mushketyk I will review it this week.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2244: [FLINK-3874] Add a Kafka TableSink with JSON seria...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2244#discussion_r75835537
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java ---
    @@ -0,0 +1,46 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.connectors.kafka;
    +
    +import org.apache.flink.api.table.Row;
    +import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
    +import org.apache.flink.streaming.util.serialization.SerializationSchema;
    +
    +import java.util.Properties;
    +
    +/**
    + * Kafka 0.8 {@link KafkaTableSink} that serializes data in JSON format.
    + */
    +public class Kafka08JsonTableSink extends KafkaJsonTableSink {
    +	/**
    +	 * Creates {@link KafkaTableSink} for Kafka 0.8
    +	 *
    +	 * @param topic topic in Kafka
    --- End diff --
    
    Please make explicit that this is the topic to which the table is written.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2244: [FLINK-3874] Add a Kafka TableSink with JSON seria...

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2244#discussion_r74704156
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/TableSinkBase.scala ---
    @@ -0,0 +1,68 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.table.sinks
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +
    +trait TableSinkBase[T] extends TableSink[T] {
    --- End diff --
    
    Makes sense. I'll **KafkaTableSink** accordingly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2244: [FLINK-3874] Add a Kafka TableSink with JSON seria...

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2244#discussion_r74633954
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java ---
    @@ -0,0 +1,52 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.util.serialization;
    +
    +import com.fasterxml.jackson.databind.JsonNode;
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import com.fasterxml.jackson.databind.node.ObjectNode;
    +import org.apache.flink.api.table.Row;
    +
    +
    +/**
    + * Serialization schema that serializes an object into a JSON bytes.
    + * <p>
    + */
    +public class JsonRowSerializationSchema implements SerializationSchema<Row> {
    +	private final String[] fieldNames;
    +	private static ObjectMapper mapper = new ObjectMapper();
    +
    +	public JsonRowSerializationSchema(String[] fieldNames) {
    +		this.fieldNames = fieldNames;
    --- End diff --
    
    Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2244: [FLINK-3874] Add a Kafka TableSink with JSON serializatio...

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on the issue:

    https://github.com/apache/flink/pull/2244
  
    Hey @fhueske 
    I've renamed tests as you suggested.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2244: [FLINK-3874] Add a Kafka TableSink with JSON seria...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2244#discussion_r75835662
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java ---
    @@ -0,0 +1,45 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.connectors.kafka;
    +
    +import org.apache.flink.api.table.Row;
    +import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
    +import org.apache.flink.streaming.util.serialization.SerializationSchema;
    +
    +import java.util.Properties;
    +
    +/**
    + * Kafka 0.9 {@link KafkaTableSink} that serializes data in JSON format.
    + */
    +public class Kafka09JsonTableSink extends KafkaJsonTableSink {
    +	/**
    +	 * Creates {@link KafkaTableSink} for Kafka 0.9
    +	 *
    +	 * @param topic topic in Kafka
    --- End diff --
    
    Please make explicit that this is the topic to which the table is written.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2244: [FLINK-3874] Add a Kafka TableSink with JSON seria...

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2244#discussion_r74633786
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java ---
    @@ -0,0 +1,62 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.connectors.kafka;
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.table.Row;
    +import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
    +import org.apache.flink.streaming.util.serialization.SerializationSchema;
    +
    +import java.util.Properties;
    +
    +/**
    + * Kafka 0.8 {@link KafkaTableSink} that serializes data in JSON format.
    + */
    +public class Kafka08JsonTableSink extends KafkaJsonTableSink {
    +	/**
    +	 * Creates {@link KafkaTableSink} for Kafka 0.8
    +	 *
    +	 * @param topic topic in Kafka
    +	 * @param properties properties to connect to Kafka
    +	 * @param partitioner Kafra partitioner
    +	 * @param fieldNames row field names
    +	 * @param fieldTypes row field types
    +	 */
    +	public Kafka08JsonTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner, String[] fieldNames, Class<?>[] fieldTypes) {
    +		super(topic, properties, partitioner, fieldNames, fieldTypes);
    +	}
    +
    +	/**
    +	 * Creates {@link KafkaTableSink} for Kafka 0.9
    --- End diff --
    
    Fixed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2244: [FLINK-3874] Add a Kafka TableSink with JSON seria...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2244#discussion_r74387953
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/sink/CsvTableSinkTest.scala ---
    @@ -0,0 +1,79 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.table.sink
    +
    +import java.io.File
    +import java.nio.charset.Charset
    +import java.nio.file.Files
    +import java.util.Collections
    +
    +import org.apache.flink.api.java.ExecutionEnvironment
    +import org.apache.flink.api.java.operators.DataSource
    +import org.apache.flink.api.table.Row
    +import org.apache.flink.api.table.sinks.CsvTableSink
    +import org.junit.rules.TemporaryFolder
    +import org.junit.{Rule, Test}
    +import org.junit.Assert._
    +
    +class CsvTableSinkTest {
    --- End diff --
    
    Could you move the tests to `TableSinkITCase`? It already tests the CSV sink a bit. Would be great if you could design your tests similarly. So that we have consistent tests. Btw, we always call our test methods `testXXX()`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2244: [FLINK-3874] Add a Kafka TableSink with JSON seria...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2244#discussion_r74703734
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/TableSinkBase.scala ---
    @@ -0,0 +1,68 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.table.sinks
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +
    +trait TableSinkBase[T] extends TableSink[T] {
    --- End diff --
    
    It makes sense to keep TableSink consistent to TableSource. But I think it is a little duplicate as `KafkaTableSink` implement what `TableSinkBase` has implemented again. 
    
    BTW, I think it's better to provide a constructor for KafkaTableSink without fieldNames and fieldTypes, because `Table.writeToSink` will inject this into TableSink automatically.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2244: [FLINK-3874] Add a Kafka TableSink with JSON seria...

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2244#discussion_r74633115
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/Row.scala ---
    @@ -31,6 +31,10 @@ class Row(arity: Int) extends Product {
     
       def setField(i: Int, value: Any): Unit = fields(i) = value
     
    +  def getField(i: Int): Any = fields(i)
    --- End diff --
    
    Removed new methods.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2244: [FLINK-3874] Add a Kafka TableSink with JSON serializatio...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on the issue:

    https://github.com/apache/flink/pull/2244
  
    LGTM except a few nits. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2244: [FLINK-3874] Add a Kafka TableSink with JSON seria...

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2244#discussion_r75938768
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java ---
    @@ -0,0 +1,45 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.connectors.kafka;
    +
    +import org.apache.flink.api.table.Row;
    +import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
    +import org.apache.flink.streaming.util.serialization.SerializationSchema;
    +
    +import java.util.Properties;
    +
    +/**
    + * Kafka 0.9 {@link KafkaTableSink} that serializes data in JSON format.
    + */
    +public class Kafka09JsonTableSink extends KafkaJsonTableSink {
    +	/**
    +	 * Creates {@link KafkaTableSink} for Kafka 0.9
    +	 *
    +	 * @param topic topic in Kafka
    --- End diff --
    
    Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2244: [FLINK-3874] Add a Kafka TableSink with JSON seria...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2244#discussion_r74386541
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/Row.scala ---
    @@ -31,6 +31,10 @@ class Row(arity: Int) extends Product {
     
       def setField(i: Int, value: Any): Unit = fields(i) = value
     
    +  def getField(i: Int): Any = fields(i)
    --- End diff --
    
    Why did you add those methods? `productArity` and `productElement` already do what you want.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2244: [FLINK-3874] Add a Kafka TableSink with JSON serializatio...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/2244
  
    Hi @mushketyk, thanks for the PR. Looks quite good. I added a few comments.
    
    Cheers, Fabian  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2244: [FLINK-3874] Add a Kafka TableSink with JSON seria...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2244#discussion_r75836733
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java ---
    @@ -0,0 +1,68 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.connectors.kafka;
    +
    +import org.apache.flink.api.table.Row;
    +import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
    +import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
    +import org.junit.Test;
    +
    +import java.io.IOException;
    +
    +import static org.junit.Assert.assertEquals;
    +
    +public class JsonRowSerializationSchemaTest {
    +	@Test
    +	public void testRowSerialization() throws IOException {
    +		String[] fieldNames = new String[] {"f1", "f2", "f3"};
    +		Class[] fieldTypes = new Class[] {Integer.class, Boolean.class, String.class};
    +		Row row = new Row(3);
    --- End diff --
    
    Please test another row to make sure that serialization / deserialization works for multiple objects.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2244: [FLINK-3874] Add a Kafka TableSink with JSON seria...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2244#discussion_r75835821
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java ---
    @@ -0,0 +1,46 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.connectors.kafka;
    +
    +import org.apache.flink.api.table.Row;
    +import org.apache.flink.streaming.util.serialization.DeserializationSchema;
    +import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
    +import org.junit.Test;
    +
    +public class Kafka08JsonTableSinkTest extends KafkaTableSinkTestBase {
    +	@Test
    +	public void kafka08JsonTableSinkTest() throws Exception {
    --- End diff --
    
    If we add a `@Test` annotation to `KafkaTableSinkTestBase.testKafkaTableSink()` we do not need this method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2244: [FLINK-3874] Add a Kafka TableSink with JSON serializatio...

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on the issue:

    https://github.com/apache/flink/pull/2244
  
    Thank you for your review @fhueske. I've updated it. Could you please take a look at it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2244: [FLINK-3874] Add a Kafka TableSink with JSON seria...

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2244#discussion_r74634350
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java ---
    @@ -0,0 +1,52 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.util.serialization;
    +
    +import com.fasterxml.jackson.databind.JsonNode;
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import com.fasterxml.jackson.databind.node.ObjectNode;
    +import org.apache.flink.api.table.Row;
    +
    +
    +/**
    + * Serialization schema that serializes an object into a JSON bytes.
    + * <p>
    + */
    +public class JsonRowSerializationSchema implements SerializationSchema<Row> {
    +	private final String[] fieldNames;
    +	private static ObjectMapper mapper = new ObjectMapper();
    +
    +	public JsonRowSerializationSchema(String[] fieldNames) {
    +		this.fieldNames = fieldNames;
    +	}
    +
    +	@Override
    +	public byte[] serialize(Row row) {
    +
    +		ObjectNode objectNode = mapper.createObjectNode();
    +		for (int i = 0; i < row.getFieldNumber(); i++) {
    +			JsonNode node = mapper.valueToTree(row.getField(i));
    +			objectNode.set(fieldNames[i], node);
    +		}
    +
    +		try {
    +			return mapper.writeValueAsBytes(objectNode);
    +		} catch (Exception e) {
    +			throw new RuntimeException(e);
    --- End diff --
    
    **JsonRowDeserializationSchema** implements **DeserializationSchema** that has the following method:
    ```java
    T deserialize(byte[] message) throws IOException;
    ```
    
    while **JsonRowSerializationSchema** implements **SerializationSchema** that has the only method:
    
    ```java
    byte[] serialize(T element); // throws no exceptions
    ```
    
    Do you suggest to change interface of the **SerializationSchema** to throw **IOException**?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2244: [FLINK-3874] Add a Kafka TableSink with JSON seria...

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2244#discussion_r75938408
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/TableSink.scala ---
    @@ -41,46 +38,13 @@ trait TableSink[T] {
         */
       def getOutputType: TypeInformation[T]
     
    -  /** Return a deep copy of the [[TableSink]]. */
    -  protected def copy: TableSink[T]
    -
    -  /**
    -    * Return the field names of the [[org.apache.flink.api.table.Table]] to emit. */
    -  protected final def getFieldNames: Array[String] = {
    -    fieldNames match {
    -      case Some(n) => n
    -      case None => throw new IllegalStateException(
    -        "TableSink must be configured to retrieve field names.")
    -    }
    -  }
    -
    -  /** Return the field types of the [[org.apache.flink.api.table.Table]] to emit. */
    -  protected final def getFieldTypes: Array[TypeInformation[_]] = {
    -    fieldTypes match {
    -      case Some(t) => t
    -      case None => throw new IllegalStateException(
    -        "TableSink must be configured to retrieve field types.")
    -    }
    -  }
    -
    -  /**
    -    * Return a copy of this [[TableSink]] configured with the field names and types of the
    --- End diff --
    
    Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2244: [FLINK-3874] Add a Kafka TableSink with JSON seria...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2244#discussion_r75836831
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java ---
    @@ -0,0 +1,140 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.connectors.kafka;
    +
    +import org.apache.flink.api.common.functions.RichMapFunction;
    +import org.apache.flink.api.common.restartstrategy.RestartStrategies;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.table.Row;
    +import org.apache.flink.streaming.api.datastream.DataStream;
    +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    +import org.apache.flink.streaming.api.functions.sink.SinkFunction;
    +import org.apache.flink.streaming.api.functions.source.SourceFunction;
    +import org.apache.flink.streaming.connectors.kafka.internals.TypeUtil;
    +import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
    +import org.apache.flink.streaming.util.serialization.DeserializationSchema;
    +import org.apache.flink.test.util.SuccessException;
    +
    +import java.io.Serializable;
    +import java.util.HashSet;
    +import java.util.Properties;
    +
    +import static org.apache.flink.test.util.TestUtils.tryExecute;
    +
    +public abstract class KafkaTableSinkTestBase extends KafkaTestBase implements Serializable {
    +
    +	protected final static String TOPIC = "customPartitioningTestTopic";
    +	protected final static int PARALLELISM = 1;
    +	protected final static String[] FIELD_NAMES = new String[] {"field1", "field2"};
    +	protected final static TypeInformation[] FIELD_TYPES = TypeUtil.toTypeInfo(new Class[] {Integer.class, String.class});
    +
    +	public void testKafkaTableSink() throws Exception {
    --- End diff --
    
    If you add a `@Test` annotation here, we do not need the test methods in the classes that extend this TestBase.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2244: [FLINK-3874] Add a Kafka TableSink with JSON seria...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2244#discussion_r74699857
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala ---
    @@ -590,7 +592,7 @@ class Table(
           .map(field => FlinkTypeFactory.toTypeInfo(field.getType)).toArray
     
         // configure the table sink
    -    val configuredSink = sink.configure(fieldNames, fieldTypes)
    +    val configuredSink = sink.asInstanceOf[TableSinkBase[T]].configure(fieldNames, fieldTypes)
    --- End diff --
    
    It seems that KafkaTableSink doesn't extends TableSinkBase, so that does it work if the input sink is a KafkaTableSink ? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2244: [FLINK-3874] Add a Kafka TableSink with JSON seria...

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2244#discussion_r74701625
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java ---
    @@ -0,0 +1,62 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.connectors.kafka;
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.table.Row;
    +import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
    +import org.apache.flink.streaming.util.serialization.SerializationSchema;
    +
    +import java.util.Properties;
    +
    +/**
    + * Kafka 0.8 {@link KafkaTableSink} that serializes data in JSON format.
    + */
    +public class Kafka08JsonTableSink extends KafkaJsonTableSink {
    +	/**
    +	 * Creates {@link KafkaTableSink} for Kafka 0.8
    +	 *
    +	 * @param topic topic in Kafka
    +	 * @param properties properties to connect to Kafka
    +	 * @param partitioner Kafra partitioner
    --- End diff --
    
    Good spot! Fixed it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2244: [FLINK-3874] Add a Kafka TableSink with JSON seria...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2244#discussion_r74390581
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/TypeUtil.java ---
    @@ -0,0 +1,38 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.util;
    --- End diff --
    
    I think this class would be better located under `org.apache.flink.streaming.connectors.kafka.internals`. It should be located at least under `org.apache.flink.streaming.connectors.kafka` and not pollute the upper streaming namespace.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2244: [FLINK-3874] Add a Kafka TableSink with JSON seria...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2244#discussion_r74392212
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchemaWrapper.java ---
    @@ -16,12 +16,14 @@
      */
     package org.apache.flink.streaming.util.serialization;
     
    +import java.io.Serializable;
    +
     /**
      * A simple wrapper for using the SerializationSchema with the KeyedDeserializationSchema
      * interface
      * @param <T> The type to serialize
      */
    -public class KeyedSerializationSchemaWrapper<T> implements KeyedSerializationSchema<T> {
    +public class KeyedSerializationSchemaWrapper<T> implements KeyedSerializationSchema<T>, Serializable {
    --- End diff --
    
    `KeyedSerializationSchema` already implements `Serializable`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2244: [FLINK-3874] Add a Kafka TableSink with JSON serializatio...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on the issue:

    https://github.com/apache/flink/pull/2244
  
    Could we move `CsvTableSinkTest` to `TableSinkITCase` or just remove `CsvTableSinkTest` ?   I think `TableSinkITCase` has tested `CsvTableSink` with default delimiter and custom delimiter.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2244: [FLINK-3874] Add a Kafka TableSink with JSON seria...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2244#discussion_r74700046
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/TableSinkBase.scala ---
    @@ -0,0 +1,68 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.table.sinks
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +
    +trait TableSinkBase[T] extends TableSink[T] {
    --- End diff --
    
    IMHO, extract methods from `TableSink` is a little trivial. Maybe marking `TableSink` to `abstract class` can solve this problem. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2244: [FLINK-3874] Add a Kafka TableSink with JSON serializatio...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on the issue:

    https://github.com/apache/flink/pull/2244
  
    @mushketyk Thanks for your PR. I added some comments to the code. The changes you made to the `TableSink` class structure seem reasonable. However, it might be better if those change would be made in a separate Jira issue and PR.
    
    In general, it would be great if you try to touch as little existing code as possible and orientate yourself more on the existing code base.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2244: [FLINK-3874] Add a Kafka TableSink with JSON seria...

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2244#discussion_r70870120
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AvroSerializationSchemaTest.java ---
    @@ -0,0 +1,114 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.connectors.kafka;
    +
    +import org.apache.avro.Schema;
    +import org.apache.avro.file.DataFileStream;
    +import org.apache.avro.io.DatumWriter;
    +import org.apache.avro.io.Encoder;
    +import org.apache.avro.io.EncoderFactory;
    +import org.apache.avro.reflect.ReflectData;
    +import org.apache.avro.reflect.ReflectDatumReader;
    +import org.apache.avro.reflect.ReflectDatumWriter;
    +import org.apache.flink.streaming.util.serialization.AvroSerializationSchema;
    +import org.junit.Test;
    +
    +import java.io.ByteArrayInputStream;
    +import java.io.ByteArrayOutputStream;
    +import java.io.IOException;
    +import java.util.Objects;
    +
    +import static javafx.scene.input.KeyCode.T;
    +import static org.junit.Assert.assertEquals;
    +
    +public class AvroSerializationSchemaTest {
    +//	@Test
    --- End diff --
    
    Sorry, I accidentally added this file to this PR.
    I've already removed it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2244: [FLINK-3874] Add a Kafka TableSink with JSON seria...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2244#discussion_r74391348
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java ---
    @@ -0,0 +1,52 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.util.serialization;
    +
    +import com.fasterxml.jackson.databind.JsonNode;
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import com.fasterxml.jackson.databind.node.ObjectNode;
    +import org.apache.flink.api.table.Row;
    +
    +
    +/**
    + * Serialization schema that serializes an object into a JSON bytes.
    + * <p>
    + */
    +public class JsonRowSerializationSchema implements SerializationSchema<Row> {
    +	private final String[] fieldNames;
    +	private static ObjectMapper mapper = new ObjectMapper();
    +
    +	public JsonRowSerializationSchema(String[] fieldNames) {
    +		this.fieldNames = fieldNames;
    --- End diff --
    
    It would be good to add a null check here and also add some Javadocs. You can orientate yourself by `JsonRowDeserializationSchema`. The two classes should look consistent.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2244: [FLINK-3874] Add a Kafka TableSink with JSON seria...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2244#discussion_r76010265
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java ---
    @@ -75,6 +75,8 @@ public KafkaTableSink(
     
     	protected abstract SerializationSchema<Row> createSerializationSchema(String[] fieldNames);
     
    +	protected abstract KafkaTableSink createCopy();
    --- End diff --
    
    Maybe add a comment that the copy should be a deep copy?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2244: [FLINK-3874] Add a Kafka TableSink with JSON serializatio...

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on the issue:

    https://github.com/apache/flink/pull/2244
  
    Hi @fhueske. Thank you for your code review. I'll update the PR today.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2244: [FLINK-3874] Add a Kafka TableSink with JSON seria...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2244#discussion_r74699842
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java ---
    @@ -0,0 +1,62 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.connectors.kafka;
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.table.Row;
    +import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
    +import org.apache.flink.streaming.util.serialization.SerializationSchema;
    +
    +import java.util.Properties;
    +
    +/**
    + * Kafka 0.8 {@link KafkaTableSink} that serializes data in JSON format.
    + */
    +public class Kafka08JsonTableSink extends KafkaJsonTableSink {
    +	/**
    +	 * Creates {@link KafkaTableSink} for Kafka 0.8
    +	 *
    +	 * @param topic topic in Kafka
    +	 * @param properties properties to connect to Kafka
    +	 * @param partitioner Kafra partitioner
    +	 * @param fieldNames row field names
    +	 * @param fieldTypes row field types
    +	 */
    +	public Kafka08JsonTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner, String[] fieldNames, Class<?>[] fieldTypes) {
    +		super(topic, properties, partitioner, fieldNames, fieldTypes);
    +	}
    +
    +	/**
    +	 * Creates {@link KafkaTableSink} for Kafka 0.8
    +	 *
    +	 * @param topic topic in Kafka
    +	 * @param properties properties to connect to Kafka
    +	 * @param partitioner Kafra partitioner
    --- End diff --
    
    Kafra -> Kafka


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2244: [FLINK-3874] Add a Kafka TableSink with JSON seria...

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2244#discussion_r74633822
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/TypeUtil.java ---
    @@ -0,0 +1,38 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.util;
    --- End diff --
    
    Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2244: [FLINK-3874] Add a Kafka TableSink with JSON serializatio...

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on the issue:

    https://github.com/apache/flink/pull/2244
  
    @twalthr Great! Thank your help.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2244: [FLINK-3874] Add a Kafka TableSink with JSON serializatio...

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on the issue:

    https://github.com/apache/flink/pull/2244
  
    @twalthr Could you review this one more time?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2244: [FLINK-3874] Add a Kafka TableSink with JSON serializatio...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/2244
  
    Looks good to merge @mushketyk :-)
    The only thing that I'd change would be to rename `Kafka08JsonTableSinkTest` to `Kafka08JsonTableSinkITCase` (same for Kafka09*), since these are long running integration tests which should be executed in the appropriate Maven phase (verify). 
    I can do that as well before merging. 
    
    Thanks, Fabian


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2244: [FLINK-3874] Add a Kafka TableSink with JSON seria...

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2244#discussion_r75938748
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java ---
    @@ -0,0 +1,45 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.connectors.kafka;
    +
    +import org.apache.flink.api.table.Row;
    +import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
    +import org.apache.flink.streaming.util.serialization.SerializationSchema;
    +
    +import java.util.Properties;
    +
    +/**
    + * Kafka 0.9 {@link KafkaTableSink} that serializes data in JSON format.
    + */
    +public class Kafka09JsonTableSink extends KafkaJsonTableSink {
    +	/**
    +	 * Creates {@link KafkaTableSink} for Kafka 0.9
    +	 *
    +	 * @param topic topic in Kafka
    +	 * @param properties properties to connect to Kafka
    +	 * @param partitioner Kafka partitioner
    +	 */
    +	public Kafka09JsonTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner) {
    +		super(topic, properties, partitioner);
    +	}
    +
    +	@Override
    +	protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties, SerializationSchema<Row> serializationSchema, KafkaPartitioner<Row> partitioner) {
    --- End diff --
    
    Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2244: [FLINK-3874] Add a Kafka TableSink with JSON serializatio...

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on the issue:

    https://github.com/apache/flink/pull/2244
  
    Hi @fhueske, I am running `mvn clean verify` locally right now and I hope to update the commit soon.
    
    Sorry for confusing you.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2244: [FLINK-3874] Add a Kafka TableSink with JSON seria...

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2244#discussion_r74633704
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/sink/CsvTableSinkTest.scala ---
    @@ -0,0 +1,79 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.table.sink
    +
    +import java.io.File
    +import java.nio.charset.Charset
    +import java.nio.file.Files
    +import java.util.Collections
    +
    +import org.apache.flink.api.java.ExecutionEnvironment
    +import org.apache.flink.api.java.operators.DataSource
    +import org.apache.flink.api.table.Row
    +import org.apache.flink.api.table.sinks.CsvTableSink
    +import org.junit.rules.TemporaryFolder
    +import org.junit.{Rule, Test}
    +import org.junit.Assert._
    +
    +class CsvTableSinkTest {
    --- End diff --
    
    TableSinkITCase class seems to be an integration test (expensive and high level), while my intention was to create a unit-test specifically for CsvTableSink (cheap and low-level) to make sure I don't break it with my changes. Therefore I thought it is better to keep them separate.
    
    Do you think it's better to merge them and turn unit tests into integration tests?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2244: [FLINK-3874] Add a Kafka TableSink with JSON seria...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2244#discussion_r74703534
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala ---
    @@ -592,7 +592,7 @@ class Table(
           .map(field => FlinkTypeFactory.toTypeInfo(field.getType)).toArray
     
         // configure the table sink
    -    val configuredSink = sink.asInstanceOf[TableSinkBase[T]].configure(fieldNames, fieldTypes)
    +    val configuredSink = sink.asInstanceOf[TableSink[T]].configure(fieldNames, fieldTypes)
    --- End diff --
    
    no need to cast


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2244: [FLINK-3874] Add a Kafka TableSink with JSON seria...

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2244#discussion_r74633921
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java ---
    @@ -0,0 +1,52 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.util.serialization;
    +
    +import com.fasterxml.jackson.databind.JsonNode;
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import com.fasterxml.jackson.databind.node.ObjectNode;
    +import org.apache.flink.api.table.Row;
    +
    +
    +/**
    + * Serialization schema that serializes an object into a JSON bytes.
    + * <p>
    --- End diff --
    
    Fixed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2244: [FLINK-3874] Add a Kafka TableSink with JSON serializatio...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on the issue:

    https://github.com/apache/flink/pull/2244
  
    The CsvTableSinkTest failed, can you check it ? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2244: [FLINK-3874] Add a Kafka TableSink with JSON seria...

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2244#discussion_r75939007
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java ---
    @@ -0,0 +1,140 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.connectors.kafka;
    +
    +import org.apache.flink.api.common.functions.RichMapFunction;
    +import org.apache.flink.api.common.restartstrategy.RestartStrategies;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.table.Row;
    +import org.apache.flink.streaming.api.datastream.DataStream;
    +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    +import org.apache.flink.streaming.api.functions.sink.SinkFunction;
    +import org.apache.flink.streaming.api.functions.source.SourceFunction;
    +import org.apache.flink.streaming.connectors.kafka.internals.TypeUtil;
    +import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
    +import org.apache.flink.streaming.util.serialization.DeserializationSchema;
    +import org.apache.flink.test.util.SuccessException;
    +
    +import java.io.Serializable;
    +import java.util.HashSet;
    +import java.util.Properties;
    +
    +import static org.apache.flink.test.util.TestUtils.tryExecute;
    +
    +public abstract class KafkaTableSinkTestBase extends KafkaTestBase implements Serializable {
    +
    +	protected final static String TOPIC = "customPartitioningTestTopic";
    +	protected final static int PARALLELISM = 1;
    +	protected final static String[] FIELD_NAMES = new String[] {"field1", "field2"};
    +	protected final static TypeInformation[] FIELD_TYPES = TypeUtil.toTypeInfo(new Class[] {Integer.class, String.class});
    +
    +	public void testKafkaTableSink() throws Exception {
    --- End diff --
    
    Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2244: [FLINK-3874] Add a Kafka TableSink with JSON seria...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2244#discussion_r74391701
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java ---
    @@ -0,0 +1,52 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.util.serialization;
    +
    +import com.fasterxml.jackson.databind.JsonNode;
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import com.fasterxml.jackson.databind.node.ObjectNode;
    +import org.apache.flink.api.table.Row;
    +
    +
    +/**
    + * Serialization schema that serializes an object into a JSON bytes.
    + * <p>
    + */
    +public class JsonRowSerializationSchema implements SerializationSchema<Row> {
    +	private final String[] fieldNames;
    +	private static ObjectMapper mapper = new ObjectMapper();
    +
    +	public JsonRowSerializationSchema(String[] fieldNames) {
    +		this.fieldNames = fieldNames;
    +	}
    +
    +	@Override
    +	public byte[] serialize(Row row) {
    +
    +		ObjectNode objectNode = mapper.createObjectNode();
    +		for (int i = 0; i < row.getFieldNumber(); i++) {
    +			JsonNode node = mapper.valueToTree(row.getField(i));
    +			objectNode.set(fieldNames[i], node);
    +		}
    +
    +		try {
    +			return mapper.writeValueAsBytes(objectNode);
    +		} catch (Exception e) {
    +			throw new RuntimeException(e);
    --- End diff --
    
    The consistency with `JsonRowDeserializationSchema` should also be in terms of exceptions. A more meaningful message would be good here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2244: [FLINK-3874] Add a Kafka TableSink with JSON serializatio...

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on the issue:

    https://github.com/apache/flink/pull/2244
  
    Thank you for helping with this PR @fhueske 
    It's ok, I'll update the PR in an hour or so.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2244: [FLINK-3874] Add a Kafka TableSink with JSON seria...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2244#discussion_r74697490
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java ---
    @@ -0,0 +1,69 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.util.serialization;
    +
    +import com.fasterxml.jackson.databind.JsonNode;
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import com.fasterxml.jackson.databind.node.ObjectNode;
    +import org.apache.flink.api.table.Row;
    +import org.apache.flink.util.Preconditions;
    +
    +
    +/**
    + * Serialization schema that serializes an object into a JSON bytes.
    + *
    + * <p>Serializes the input {@link Row} object into a JSON string and
    + * converts it into <code>byte[]</code>.
    + *
    + * <p>Result <bode>byte[]</bode> messages can be deserialized using
    + * {@link JsonRowDeserializationSchema}.
    + */
    +public class JsonRowSerializationSchema implements SerializationSchema<Row> {
    +	/** Fields names in the input Row object */
    +	private final String[] fieldNames;
    +	/** Object mapper that is used to create output JSON objects */
    +	private static ObjectMapper mapper = new ObjectMapper();
    +
    +	/**
    +	 * Creates a JSON serialization schema for the given fields and types.
    +	 *
    +	 * @param fieldNames Names of JSON fields to parse.
    +	 */
    +	public JsonRowSerializationSchema(String[] fieldNames) {
    +		this.fieldNames = Preconditions.checkNotNull(fieldNames);
    +	}
    +
    +	@Override
    +	public byte[] serialize(Row row) {
    +		if (row.productArity() != fieldNames.length) {
    +			throw new IllegalStateException(String.format(
    +				"Number of elements in the row %s is different from number of field names: %d", row, fieldNames.length));
    +		}
    +
    +		ObjectNode objectNode = mapper.createObjectNode();
    +		for (int i = 0; i < row.productArity(); i++) {
    +			JsonNode node = mapper.valueToTree(row.productElement(i));
    +			objectNode.set(fieldNames[i], node);
    +		}
    +
    +		try {
    +			return mapper.writeValueAsBytes(objectNode);
    +		} catch (Exception e) {
    +			throw new RuntimeException("Failed to serialize row", e);
    --- End diff --
    
    would be better to throw an IOException here ? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2244: [FLINK-3874] Add a Kafka TableSink with JSON serializatio...

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on the issue:

    https://github.com/apache/flink/pull/2244
  
    @wuchong 
    
    Tests passed on most platforms, and failures do not seem to be related to code that added/changed:
    
    ```
    java.lang.IllegalStateException: cannot create children while terminating or terminated
    
    	at akka.actor.dungeon.Children$class.makeChild(Children.scala:200)
    
    	at akka.actor.dungeon.Children$class.attachChild(Children.scala:40)
    
    	at akka.actor.ActorCell.attachChild(ActorCell.scala:369)
    
    	at akka.actor.ActorSystemImpl.actorOf(ActorSystem.scala:555)
    
    	at org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:120)
    
    	at org.apache.flink.runtime.minicluster.FlinkMiniCluster.submitJobAndWait(FlinkMiniCluster.scala:485)
    
    	at org.apache.flink.runtime.minicluster.FlinkMiniCluster.submitJobAndWait(FlinkMiniCluster.scala:471)
    
    	at org.apache.flink.runtime.minicluster.FlinkMiniCluster.submitJobAndWait(FlinkMiniCluster.scala:463)
    
    	at org.apache.flink.test.util.TestEnvironment.execute(TestEnvironment.java:78)
    
    	at org.apache.flink.api.table.sink.CsvTableSinkTest.writeToCsv(CsvTableSinkTest.scala:69)
    
    	at org.apache.flink.api.table.sink.CsvTableSinkTest.testSaveDataSetToCsvFileWithDefaultDelimiter(CsvTableSinkTest.scala:48)
    ```
    
    Are there issues with these platforms?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2244: [FLINK-3874] Add a Kafka TableSink with JSON seria...

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2244#discussion_r74633804
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java ---
    @@ -0,0 +1,120 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.connectors.kafka;
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.table.Row;
    +import org.apache.flink.api.table.sinks.StreamTableSink;
    +import org.apache.flink.api.table.typeutils.RowTypeInfo;
    +import org.apache.flink.streaming.api.datastream.DataStream;
    +import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
    +import org.apache.flink.streaming.util.serialization.SerializationSchema;
    +import org.apache.flink.util.Preconditions;
    +
    +import java.util.Properties;
    +
    +import static org.apache.flink.streaming.util.TypeUtil.toTypeInfo;
    +
    +/**
    + * A version-agnostic Kafka {@link StreamTableSink}.
    + *
    + * <p>The version-specific Kafka consumers need to extend this class and
    + * override {@link #createKafkaProducer(String, Properties, SerializationSchema, KafkaPartitioner)}}.
    + */
    +public abstract class KafkaTableSink implements StreamTableSink<Row> {
    +
    +	private final String topic;
    +	private final Properties properties;
    +	private final SerializationSchema<Row> serializationSchema;
    +	private final KafkaPartitioner<Row> partitioner;
    +	private final String[] fieldNames;
    +	private final TypeInformation[] fieldTypes;
    +
    +	/**
    +	 * Creates KafkaTableSink
    +	 *
    +	 * @param topic                 Kafka topic to consume.
    +	 * @param properties            Properties for the Kafka consumer.
    +	 * @param serializationSchema   Serialization schema for emitted items
    +	 * @param partitioner           Partitioner to select Kafka partition for each item
    +	 * @param fieldNames            Row field names.
    +	 * @param fieldTypes            Row field types.
    +	 */
    +	public KafkaTableSink(
    +			String topic,
    +			Properties properties,
    +			SerializationSchema<Row> serializationSchema,
    +			KafkaPartitioner<Row> partitioner,
    +			String[] fieldNames,
    +			Class<?>[] fieldTypes) {
    +		this(topic, properties, serializationSchema, partitioner, fieldNames, toTypeInfo(fieldTypes));
    +	}
    +
    +	/**
    +	 * Creates KafkaTableSink
    +	 *
    +	 * @param topic                 Kafka topic to consume.
    +	 * @param properties            Properties for the Kafka consumer.
    +	 * @param serializationSchema   Serialization schema for emitted items
    +	 * @param partitioner           Partitioner to select Kafka partition for each item
    +	 * @param fieldNames            Row field names.
    +	 * @param fieldTypes            Row field types.
    +	 */
    +	public KafkaTableSink(
    +			String topic,
    +			Properties properties,
    +			SerializationSchema<Row> serializationSchema,
    +			KafkaPartitioner<Row> partitioner,
    +			String[] fieldNames,
    +			TypeInformation<?>[] fieldTypes) {
    +
    +		this.topic = Preconditions.checkNotNull(topic, "topic");
    +		this.properties = Preconditions.checkNotNull(properties, "properties");
    +		this.serializationSchema = Preconditions.checkNotNull(serializationSchema, "serializationSchema");
    +		this.partitioner = Preconditions.checkNotNull(partitioner, "partitioner");
    +		this.fieldNames = Preconditions.checkNotNull(fieldNames, "fieldNames");
    +		this.fieldTypes = Preconditions.checkNotNull(fieldTypes);
    +		Preconditions.checkArgument(fieldNames.length == fieldTypes.length,
    +			"Number of provided field names and types does not match.");
    +	}
    +
    +	@Override
    +	public void emitDataStream(DataStream<Row> dataStream) {
    +		FlinkKafkaProducerBase<Row> kafkaProducer = createKafkaProducer(topic, properties, serializationSchema, partitioner);
    +		dataStream.addSink(kafkaProducer);
    +	}
    +
    +	abstract protected FlinkKafkaProducerBase<Row> createKafkaProducer(
    --- End diff --
    
    Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2244: [FLINK-3874] Add a Kafka TableSink with JSON serializatio...

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on the issue:

    https://github.com/apache/flink/pull/2244
  
    Last build failed on an outdated version of this branch. Pushed an updated and fixed version of this PR to trigger build on a newer version.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2244: [FLINK-3874] Add a Kafka TableSink with JSON serializatio...

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on the issue:

    https://github.com/apache/flink/pull/2244
  
    @twalthr I've updated the PR according to your comments. Could you please review it again?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2244: [FLINK-3874] Add a Kafka TableSink with JSON serializatio...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/2244
  
    Thanks for the update @mushketyk. Regarding the `ObjectNode` that's fine. It was just a possible optimization but if it is not worked let's keep it as it is. I added two minor comments. After that, the PR should be good to merge. Thanks, Fabian


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2244: [FLINK-3874] Add a Kafka TableSink with JSON serializatio...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/2244
  
    Ah, I see. Very good :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2244: [FLINK-3874] Add a Kafka TableSink with JSON serializatio...

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on the issue:

    https://github.com/apache/flink/pull/2244
  
    @fhueske I've fixed the PR according to your review.
    The only thing that I didn't do is reusing `ObjectNode` on every iteration. It caused some weird NPEs during the integration tests and made code a bit more complex.
    I can investigate it more if you think it really worth it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2244: [FLINK-3874] Add a Kafka TableSink with JSON serializatio...

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on the issue:

    https://github.com/apache/flink/pull/2244
  
    I tried to inherit TableSink trait in Java code but it seems that it is impossible to inherit traits with vars in Java, therefor I had to change class structures there somewhat. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2244: [FLINK-3874] Add a Kafka TableSink with JSON serializatio...

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on the issue:

    https://github.com/apache/flink/pull/2244
  
    @wuchong I've removed the test as you've suggested since it pretty much duplicates `TableSinkITCase`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2244: [FLINK-3874] Add a Kafka TableSink with JSON seria...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2244#discussion_r74699840
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java ---
    @@ -0,0 +1,62 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.connectors.kafka;
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.table.Row;
    +import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
    +import org.apache.flink.streaming.util.serialization.SerializationSchema;
    +
    +import java.util.Properties;
    +
    +/**
    + * Kafka 0.8 {@link KafkaTableSink} that serializes data in JSON format.
    + */
    +public class Kafka08JsonTableSink extends KafkaJsonTableSink {
    +	/**
    +	 * Creates {@link KafkaTableSink} for Kafka 0.8
    +	 *
    +	 * @param topic topic in Kafka
    +	 * @param properties properties to connect to Kafka
    +	 * @param partitioner Kafra partitioner
    --- End diff --
    
    Kafra -> Kafka


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2244: [FLINK-3874] Add a Kafka TableSink with JSON seria...

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2244#discussion_r75938953
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java ---
    @@ -0,0 +1,110 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.connectors.kafka;
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.table.Row;
    +import org.apache.flink.api.table.sinks.StreamTableSink;
    +import org.apache.flink.api.table.typeutils.RowTypeInfo;
    +import org.apache.flink.streaming.api.datastream.DataStream;
    +import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
    +import org.apache.flink.streaming.util.serialization.SerializationSchema;
    +import org.apache.flink.util.Preconditions;
    +
    +import java.util.Properties;
    +
    +/**
    + * A version-agnostic Kafka {@link StreamTableSink}.
    + *
    + * <p>The version-specific Kafka consumers need to extend this class and
    + * override {@link #createKafkaProducer(String, Properties, SerializationSchema, KafkaPartitioner)}}.
    + */
    +public abstract class KafkaTableSink implements StreamTableSink<Row> {
    +
    +	private final String topic;
    +	private final Properties properties;
    +	private SerializationSchema<Row> serializationSchema;
    +	private final KafkaPartitioner<Row> partitioner;
    +	private String[] fieldNames;
    +	private TypeInformation[] fieldTypes;
    +	/**
    +	 * Creates KafkaTableSink
    +	 *
    +	 * @param topic                 Kafka topic to consume.
    +	 * @param properties            Properties for the Kafka consumer.
    +	 * @param partitioner           Partitioner to select Kafka partition for each item
    +	 */
    +	public KafkaTableSink(
    +			String topic,
    +			Properties properties,
    +			KafkaPartitioner<Row> partitioner) {
    +
    +		this.topic = Preconditions.checkNotNull(topic, "topic");
    +		this.properties = Preconditions.checkNotNull(properties, "properties");
    +		this.partitioner = Preconditions.checkNotNull(partitioner, "partitioner");
    +	}
    +
    +	/**
    +	 * Returns the version-specifid Kafka producer.
    +	 *
    +	 * @param topic               Kafka topic to produce to.
    +	 * @param properties          Properties for the Kafka producer.
    +	 * @param serializationSchema Serialization schema to use to create Kafka records.
    +	 * @param partitioner         Partitioner to select Kafka partition.
    +	 * @return The version-specific Kafka producer
    +	 */
    +	protected abstract FlinkKafkaProducerBase<Row> createKafkaProducer(
    +		String topic, Properties properties,
    +		SerializationSchema<Row> serializationSchema,
    +		KafkaPartitioner<Row> partitioner);
    +
    +	protected abstract SerializationSchema<Row> createSerializationSchema(String[] fieldNames);
    +
    +	@Override
    +	public void emitDataStream(DataStream<Row> dataStream) {
    +		FlinkKafkaProducerBase<Row> kafkaProducer = createKafkaProducer(topic, properties, serializationSchema, partitioner);
    +		dataStream.addSink(kafkaProducer);
    +	}
    +
    +	@Override
    +	public TypeInformation<Row> getOutputType() {
    +		return new RowTypeInfo(getFieldTypes());
    +	}
    +
    +	public String[] getFieldNames() {
    +		return fieldNames;
    +	}
    +
    +	@Override
    +	public TypeInformation<?>[] getFieldTypes() {
    +		return fieldTypes;
    +	}
    +
    +	@Override
    +	public KafkaTableSink configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
    +		this.fieldNames = Preconditions.checkNotNull(fieldNames, "fieldNames");
    +		this.fieldTypes = Preconditions.checkNotNull(fieldTypes, "fieldTypes");
    +		Preconditions.checkArgument(fieldNames.length == fieldTypes.length,
    +			"Number of provided field names and types does not match.");
    +		this.serializationSchema = createSerializationSchema(fieldNames);
    +
    +		return this;
    --- End diff --
    
    Fixed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2244: [FLINK-3874] Add a Kafka TableSink with JSON seria...

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2244#discussion_r74698018
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java ---
    @@ -0,0 +1,69 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.util.serialization;
    +
    +import com.fasterxml.jackson.databind.JsonNode;
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import com.fasterxml.jackson.databind.node.ObjectNode;
    +import org.apache.flink.api.table.Row;
    +import org.apache.flink.util.Preconditions;
    +
    +
    +/**
    + * Serialization schema that serializes an object into a JSON bytes.
    + *
    + * <p>Serializes the input {@link Row} object into a JSON string and
    + * converts it into <code>byte[]</code>.
    + *
    + * <p>Result <bode>byte[]</bode> messages can be deserialized using
    + * {@link JsonRowDeserializationSchema}.
    + */
    +public class JsonRowSerializationSchema implements SerializationSchema<Row> {
    +	/** Fields names in the input Row object */
    +	private final String[] fieldNames;
    +	/** Object mapper that is used to create output JSON objects */
    +	private static ObjectMapper mapper = new ObjectMapper();
    --- End diff --
    
    It is thread-safe. Here is a quote from Jackson docs (http://wiki.fasterxml.com/JacksonBestPracticeThreadSafety):
    
    * Factories (ObjectMapper, JsonFactory) are thread-safe once configured: so ensure that all configuration is done from a single thread, and before instantiating anything with factory.
    
    * Reader/writer instances (like JsonParser and JsonParser) are not thread-safe -- there is usually no need for them to be, but if for some reason you need to access them from multiple threads, external synchronization is needed 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2244: [FLINK-3874] Add a Kafka TableSink with JSON seria...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2244#discussion_r75835608
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java ---
    @@ -0,0 +1,45 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.connectors.kafka;
    +
    +import org.apache.flink.api.table.Row;
    +import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
    +import org.apache.flink.streaming.util.serialization.SerializationSchema;
    +
    +import java.util.Properties;
    +
    +/**
    + * Kafka 0.9 {@link KafkaTableSink} that serializes data in JSON format.
    + */
    +public class Kafka09JsonTableSink extends KafkaJsonTableSink {
    +	/**
    +	 * Creates {@link KafkaTableSink} for Kafka 0.9
    +	 *
    +	 * @param topic topic in Kafka
    +	 * @param properties properties to connect to Kafka
    +	 * @param partitioner Kafka partitioner
    +	 */
    +	public Kafka09JsonTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner) {
    +		super(topic, properties, partitioner);
    +	}
    +
    +	@Override
    +	protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties, SerializationSchema<Row> serializationSchema, KafkaPartitioner<Row> partitioner) {
    --- End diff --
    
    Please add a space after `String topic,`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---