You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by IgorBerman <gi...@git.apache.org> on 2016/04/30 17:02:24 UTC

[GitHub] flink pull request: [FLINK-3854] Support Avro key-value rolling si...

GitHub user IgorBerman opened a pull request:

    https://github.com/apache/flink/pull/1953

    [FLINK-3854] Support Avro key-value rolling sink writer

    Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
    If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html).
    In addition to going through the list, please provide a meaningful description of your changes.
    
    - [ ] General
      - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text")
      - The pull request addresses only one issue
      - Each commit in the PR has a meaningful commit message (including the JIRA id)
    
    - [ ] Documentation
      - Documentation has been added for new functionality
      - Old documentation affected by the pull request has been updated
      - JavaDoc for public methods has been added
    
    - [ ] Tests & Build
      - Functionality added by the pull request is covered by tests
      - `mvn clean verify` has been executed successfully locally or a Travis build has passed
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/IgorBerman/flink flink-3854-avro-kv-sink-writer

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/1953.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1953
    
----
commit 212777e8f7d2dd03371c34d97ee3a27c68b48d18
Author: Igor Berman <ig...@dynamicyield.com>
Date:   2016-04-30T14:54:52Z

    [FLINK-3854] Support Avro key-value rolling sink writer

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3854] Support Avro key-value rolling si...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the pull request:

    https://github.com/apache/flink/pull/1953#issuecomment-216627907
  
    That's a problem we currently have since the wikipedia IRC channel times out. Restarting wouldn't help, but in the future, if you want to restart you can push a new (possibly) empty commit.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #1953: [FLINK-3854] Support Avro key-value rolling sink writer

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the issue:

    https://github.com/apache/flink/pull/1953
  
    I'm merging this change ....


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3854] Support Avro key-value rolling si...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the pull request:

    https://github.com/apache/flink/pull/1953#issuecomment-220105461
  
    We recently became more aware of build time increases due to our usage of a lot of ITCases. I think we should have the tests as unit tests instead of ITCases since they can be run a lot quicker. In our case, we would just instantiate the writer directly, have it write, simulate a cancel and use `getPos` before.
    
    I'm sorry that this PR is taking so long to get in.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3854] Support Avro key-value rolling si...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the pull request:

    https://github.com/apache/flink/pull/1953#issuecomment-217869409
  
    Ok, thats "normal" right now. @rmetzger if you don't have any more objections I would merge it in a bit.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3854] Support Avro key-value rolling si...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1953#discussion_r61745619
  
    --- Diff: flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java ---
    @@ -0,0 +1,308 @@
    +package org.apache.flink.streaming.connectors.fs;
    +
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +import java.io.IOException;
    +import java.io.OutputStream;
    +import java.util.Arrays;
    +import java.util.Map;
    +
    +import org.apache.avro.Schema;
    +import org.apache.avro.file.CodecFactory;
    +import org.apache.avro.file.DataFileConstants;
    +import org.apache.avro.file.DataFileWriter;
    +import org.apache.avro.generic.GenericData;
    +import org.apache.avro.generic.GenericDatumWriter;
    +import org.apache.avro.generic.GenericRecord;
    +import org.apache.avro.io.DatumWriter;
    +import org.apache.flink.api.common.ExecutionConfig;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
    +import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    +
    +/**
    +* Implementation of AvroKeyValue writer that can be used in Sink.
    +* Each entry would be wrapped in GenericRecord with key/value fields(same as in m/r lib)
    +<pre>
    +Usage:
    +{@code
    +		RollingSink<Tuple2<Long , Long>> sink = new RollingSink<Tuple2<Long , Long>>("/tmp/path");
    +		sink.setBucketer(new DateTimeBucketer("yyyy-MM-dd/HH/mm/"));
    +		sink.setPendingSuffix(".avro");
    +		Map<String,String> properties = new HashMap<>();
    +		Schema longSchema = Schema.create(Type.LONG);
    +		String keySchema = longSchema.toString();
    +		String valueSchema = longSchema.toString();
    +		properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_KEY_SCHEMA, keySchema);
    +		properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_VALUE_SCHEMA, valueSchema);
    +		properties.put(AvroKeyValueSinkWriter.CONF_COMPRESS, Boolean.toString(true));
    +		properties.put(AvroKeyValueSinkWriter.CONF_COMPRESS_CODEC, DataFileConstants.SNAPPY_CODEC);
    +		
    +		sink.setWriter(new AvroSinkWriter<Long , Long>(properties));
    +		sink.setBatchSize(1024 * 1024 * 64); // this is 64 MB,
    +}
    +</pre>
    +*/
    +public class AvroKeyValueSinkWriter<K, V> extends StreamWriterBase<Tuple2<K, V>>  implements Writer<Tuple2<K, V>>, InputTypeConfigurable {
    +	private static final long serialVersionUID = 1L;
    +	public static final String CONF_OUTPUT_KEY_SCHEMA = "avro.schema.output.key";
    +	public static final String CONF_OUTPUT_VALUE_SCHEMA = "avro.schema.output.value";
    +	public static final String CONF_COMPRESS = FileOutputFormat.COMPRESS;
    +	public static final String CONF_COMPRESS_CODEC = FileOutputFormat.COMPRESS_CODEC;
    +	public static final String CONF_DEFLATE_LEVEL = "avro.deflate.level";
    +	public static final String CONF_XZ_LEVEL = "avro.xz.level";
    +
    +	private transient AvroKeyValueWriter<K, V> keyValueWriter;
    +
    +	private final Map<String, String> properties;
    +
    +	/**
    +	 * C'tor for the writer
    +	 * <p>
    +	 * You can provide different properties that will be used to configure avro key-value writer as simple properties map(see example above)
    +	 * @param properties
    +	 */
    +	public AvroKeyValueSinkWriter(Map<String, String> properties) {
    +		this.properties = properties;
    +	}
    +
    +	private boolean getBoolean(Map<String,String> conf, String key, boolean def) {
    +		String value = conf.get(key);
    +		if (value == null) {
    +			return def;
    +		}
    +		return Boolean.parseBoolean(value);
    +	}
    +	
    +	private int getInt(Map<String,String> conf, String key, int def) {
    +		String value = conf.get(key);
    +		if (value == null) {
    +			return def;
    +		}
    +		return Integer.parseInt(value);
    +	}
    +
    +	//this derived from AvroOutputFormatBase.getCompressionCodec(..)
    +	private CodecFactory getCompressionCodec(Map<String,String> conf) {
    +		if (getBoolean(conf, CONF_COMPRESS, false)) {
    +			int deflateLevel = getInt(conf, CONF_DEFLATE_LEVEL, CodecFactory.DEFAULT_DEFLATE_LEVEL);
    +			int xzLevel = getInt(conf, CONF_XZ_LEVEL, CodecFactory.DEFAULT_XZ_LEVEL);
    +
    +			String outputCodec = conf.get(CONF_COMPRESS_CODEC);
    +
    +			if (DataFileConstants.DEFLATE_CODEC.equals(outputCodec)) {
    +				return CodecFactory.deflateCodec(deflateLevel);
    +			} else if (DataFileConstants.XZ_CODEC.equals(outputCodec)) {
    +				return CodecFactory.xzCodec(xzLevel);
    +			} else {
    +				return CodecFactory.fromString(outputCodec);
    +			}
    +		}
    +		return CodecFactory.nullCodec();
    +	}
    +
    +	@Override
    +	public void open(FileSystem fs, Path path) throws IOException {
    +		super.open(fs, path);
    +
    +		CodecFactory compressionCodec = getCompressionCodec(properties);
    +		  		  
    +		String keySchemaString = properties.get(CONF_OUTPUT_KEY_SCHEMA);
    +		if (keySchemaString == null) {
    +			throw new IllegalStateException("No key schema provided, set '" + CONF_OUTPUT_KEY_SCHEMA + "' property");
    +		}
    +		@SuppressWarnings("deprecation")
    +		Schema keySchema = Schema.parse(keySchemaString);
    +		
    +		String valueSchemaString = properties.get(CONF_OUTPUT_VALUE_SCHEMA);
    +		if (valueSchemaString == null) {
    +			throw new IllegalStateException("No value schema provided, set '" + CONF_OUTPUT_VALUE_SCHEMA + "' property");
    --- End diff --
    
    Same here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3854] Support Avro key-value rolling si...

Posted by IgorBerman <gi...@git.apache.org>.
Github user IgorBerman commented on the pull request:

    https://github.com/apache/flink/pull/1953#issuecomment-216622861
  
    @aljoscha can we rerun somehow build? I've checked it failed on  flink-connector-wikiedits which isn't connected...unless I'm missing something
    
    Running org.apache.flink.streaming.connectors.wikiedits.WikipediaEditsSourceTest
    Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 120.022 sec <<< FAILURE! - in org.apache.flink.streaming.connectors.wikiedits.WikipediaEditsSourceTest
    testWikipediaEditsSource(org.apache.flink.streaming.connectors.wikiedits.WikipediaEditsSourceTest)  Time elapsed: 120.01 sec  <<< ERROR!
    java.lang.Exception: test timed out after 120000 milliseconds
    	at sun.misc.Unsafe.park(Native Method)
    	at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
    	at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedNanos(AbstractQueuedSynchronizer.java:1033)
    	at java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1326)
    	at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:208)
    	at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3854] Support Avro key-value rolling si...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the pull request:

    https://github.com/apache/flink/pull/1953#issuecomment-216225261
  
    AFAIK the failing build is "normal" right now, we have some problem with the travis caches.
    
    The PR looks very good, @rmetzger what are you saying about copying the code from avro-mapred instead of adding it as a dependency? I think it's fine because we don't want such a big dependency. My only concern is that the binary format might go out of sync. But then we can adapt, I think.
    
    The tests also look good, I don't think a separate exactly-once test is needed.
    
    Did you also test this in a cluster and/or on some real-world data?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #1953: [FLINK-3854] Support Avro key-value rolling sink writer

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the issue:

    https://github.com/apache/flink/pull/1953
  
    The tests are executing very fast:
    ![image](https://cloud.githubusercontent.com/assets/89049/15924303/babee52c-2e31-11e6-95da-468a0ff58ca0.png)



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3854] Support Avro key-value rolling si...

Posted by IgorBerman <gi...@git.apache.org>.
Github user IgorBerman commented on the pull request:

    https://github.com/apache/flink/pull/1953#issuecomment-216324472
  
    @aljoscha , @rmetzger thanks for the review. I've updated PR : moved verifications to the C'tor 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3854] Support Avro key-value rolling si...

Posted by IgorBerman <gi...@git.apache.org>.
Github user IgorBerman commented on the pull request:

    https://github.com/apache/flink/pull/1953#issuecomment-217273709
  
    @aljoscha tests returned to prev state 
    all profiles passed except for
    JDK: openjdk7
    PROFILE="-Dhadoop.profile=1"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3854] Support Avro key-value rolling si...

Posted by IgorBerman <gi...@git.apache.org>.
Github user IgorBerman commented on the pull request:

    https://github.com/apache/flink/pull/1953#issuecomment-216244636
  
    @aljoscha , Hi, thanks for the review. 
    Not tested with real word data yet, only with simple avro objects(with several fields inside). 
    Tested in local environment only that it can write to s3(with parallelism of 8).



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3854] Support Avro key-value rolling si...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1953#discussion_r61745567
  
    --- Diff: flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java ---
    @@ -0,0 +1,308 @@
    +package org.apache.flink.streaming.connectors.fs;
    +
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +import java.io.IOException;
    +import java.io.OutputStream;
    +import java.util.Arrays;
    +import java.util.Map;
    +
    +import org.apache.avro.Schema;
    +import org.apache.avro.file.CodecFactory;
    +import org.apache.avro.file.DataFileConstants;
    +import org.apache.avro.file.DataFileWriter;
    +import org.apache.avro.generic.GenericData;
    +import org.apache.avro.generic.GenericDatumWriter;
    +import org.apache.avro.generic.GenericRecord;
    +import org.apache.avro.io.DatumWriter;
    +import org.apache.flink.api.common.ExecutionConfig;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
    +import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    +
    +/**
    +* Implementation of AvroKeyValue writer that can be used in Sink.
    +* Each entry would be wrapped in GenericRecord with key/value fields(same as in m/r lib)
    +<pre>
    +Usage:
    +{@code
    +		RollingSink<Tuple2<Long , Long>> sink = new RollingSink<Tuple2<Long , Long>>("/tmp/path");
    +		sink.setBucketer(new DateTimeBucketer("yyyy-MM-dd/HH/mm/"));
    +		sink.setPendingSuffix(".avro");
    +		Map<String,String> properties = new HashMap<>();
    +		Schema longSchema = Schema.create(Type.LONG);
    +		String keySchema = longSchema.toString();
    +		String valueSchema = longSchema.toString();
    +		properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_KEY_SCHEMA, keySchema);
    +		properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_VALUE_SCHEMA, valueSchema);
    +		properties.put(AvroKeyValueSinkWriter.CONF_COMPRESS, Boolean.toString(true));
    +		properties.put(AvroKeyValueSinkWriter.CONF_COMPRESS_CODEC, DataFileConstants.SNAPPY_CODEC);
    +		
    +		sink.setWriter(new AvroSinkWriter<Long , Long>(properties));
    +		sink.setBatchSize(1024 * 1024 * 64); // this is 64 MB,
    +}
    +</pre>
    +*/
    +public class AvroKeyValueSinkWriter<K, V> extends StreamWriterBase<Tuple2<K, V>>  implements Writer<Tuple2<K, V>>, InputTypeConfigurable {
    +	private static final long serialVersionUID = 1L;
    +	public static final String CONF_OUTPUT_KEY_SCHEMA = "avro.schema.output.key";
    +	public static final String CONF_OUTPUT_VALUE_SCHEMA = "avro.schema.output.value";
    +	public static final String CONF_COMPRESS = FileOutputFormat.COMPRESS;
    +	public static final String CONF_COMPRESS_CODEC = FileOutputFormat.COMPRESS_CODEC;
    +	public static final String CONF_DEFLATE_LEVEL = "avro.deflate.level";
    +	public static final String CONF_XZ_LEVEL = "avro.xz.level";
    +
    +	private transient AvroKeyValueWriter<K, V> keyValueWriter;
    +
    +	private final Map<String, String> properties;
    +
    +	/**
    +	 * C'tor for the writer
    +	 * <p>
    +	 * You can provide different properties that will be used to configure avro key-value writer as simple properties map(see example above)
    +	 * @param properties
    +	 */
    +	public AvroKeyValueSinkWriter(Map<String, String> properties) {
    +		this.properties = properties;
    +	}
    +
    +	private boolean getBoolean(Map<String,String> conf, String key, boolean def) {
    +		String value = conf.get(key);
    +		if (value == null) {
    +			return def;
    +		}
    +		return Boolean.parseBoolean(value);
    +	}
    +	
    +	private int getInt(Map<String,String> conf, String key, int def) {
    +		String value = conf.get(key);
    +		if (value == null) {
    +			return def;
    +		}
    +		return Integer.parseInt(value);
    +	}
    +
    +	//this derived from AvroOutputFormatBase.getCompressionCodec(..)
    +	private CodecFactory getCompressionCodec(Map<String,String> conf) {
    +		if (getBoolean(conf, CONF_COMPRESS, false)) {
    +			int deflateLevel = getInt(conf, CONF_DEFLATE_LEVEL, CodecFactory.DEFAULT_DEFLATE_LEVEL);
    +			int xzLevel = getInt(conf, CONF_XZ_LEVEL, CodecFactory.DEFAULT_XZ_LEVEL);
    +
    +			String outputCodec = conf.get(CONF_COMPRESS_CODEC);
    +
    +			if (DataFileConstants.DEFLATE_CODEC.equals(outputCodec)) {
    +				return CodecFactory.deflateCodec(deflateLevel);
    +			} else if (DataFileConstants.XZ_CODEC.equals(outputCodec)) {
    +				return CodecFactory.xzCodec(xzLevel);
    +			} else {
    +				return CodecFactory.fromString(outputCodec);
    +			}
    +		}
    +		return CodecFactory.nullCodec();
    +	}
    +
    +	@Override
    +	public void open(FileSystem fs, Path path) throws IOException {
    +		super.open(fs, path);
    +
    +		CodecFactory compressionCodec = getCompressionCodec(properties);
    +		  		  
    +		String keySchemaString = properties.get(CONF_OUTPUT_KEY_SCHEMA);
    +		if (keySchemaString == null) {
    --- End diff --
    
    I think it makes more sense to do that check in the constructor. Otherwise, the code will fail after the deployment.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #1953: [FLINK-3854] Support Avro key-value rolling sink writer

Posted by IgorBerman <gi...@git.apache.org>.
Github user IgorBerman commented on the issue:

    https://github.com/apache/flink/pull/1953
  
    @rmetzger @aljoscha thanks guys, sorry couldn't refactor tests, a bit of pressure at work.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3854] Support Avro key-value rolling si...

Posted by IgorBerman <gi...@git.apache.org>.
Github user IgorBerman commented on the pull request:

    https://github.com/apache/flink/pull/1953#issuecomment-220131404
  
    @aljoscha , ok, I'll try to refactor tests to be unit at weekend


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #1953: [FLINK-3854] Support Avro key-value rolling sink writer

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the issue:

    https://github.com/apache/flink/pull/1953
  
    This is introducing two new ITCases, though. I thought we want to reduce the number of those because they slow down the build.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3854] Support Avro key-value rolling si...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the pull request:

    https://github.com/apache/flink/pull/1953#issuecomment-216251703
  
    Overall, the code looks good. I forgot that we have the avro dependency in by default ;)
    I had one minor commit regarding early input validation, other than that the change is good to merge.
    
    If there'll be more rolling file sinks are coming, we might need to put them into dedicated maven modules to avoid a dependency mess for the module. But avro is an exception here ;)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #1953: [FLINK-3854] Support Avro key-value rolling sink w...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/1953


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3854] Support Avro key-value rolling si...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the pull request:

    https://github.com/apache/flink/pull/1953#issuecomment-216247252
  
    @rmetzger this is good enough for me, since there are also tests. What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #1953: [FLINK-3854] Support Avro key-value rolling sink writer

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the issue:

    https://github.com/apache/flink/pull/1953
  
    Thank you for the contribution @IgorBerman


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3854] Support Avro key-value rolling si...

Posted by IgorBerman <gi...@git.apache.org>.
Github user IgorBerman commented on the pull request:

    https://github.com/apache/flink/pull/1953#issuecomment-216031368
  
    all profiles passed except for
     JDK: openjdk7
     PROFILE="-Dhadoop.profile=1"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---