You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/05/28 06:17:48 UTC
[18/21] flink git commit: [FLINK-6711] Activate strict checkstyle for
flink-connector-filesystem
[FLINK-6711] Activate strict checkstyle for flink-connector-filesystem
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7292c874
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7292c874
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7292c874
Branch: refs/heads/master
Commit: 7292c8743d981d61b0f860367e0266b307e1362f
Parents: fab8fe5
Author: zentol <ch...@apache.org>
Authored: Wed May 24 23:57:18 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Sun May 28 00:11:29 2017 +0200
----------------------------------------------------------------------
.../flink-connector-filesystem/pom.xml | 3 +-
.../connectors/fs/AvroKeyValueSinkWriter.java | 60 +++++----
.../flink/streaming/connectors/fs/Bucketer.java | 5 +-
.../flink/streaming/connectors/fs/Clock.java | 8 +-
.../connectors/fs/DateTimeBucketer.java | 18 +--
.../connectors/fs/NonRollingBucketer.java | 2 +
.../streaming/connectors/fs/RollingSink.java | 109 ++++++++--------
.../connectors/fs/SequenceFileWriter.java | 5 +-
.../connectors/fs/StreamWriterBase.java | 21 +--
.../streaming/connectors/fs/StringWriter.java | 1 +
.../streaming/connectors/fs/SystemClock.java | 2 +-
.../flink/streaming/connectors/fs/Writer.java | 4 +-
.../fs/bucketing/BasePathBucketer.java | 2 +
.../connectors/fs/bucketing/Bucketer.java | 6 +-
.../connectors/fs/bucketing/BucketingSink.java | 93 ++++++-------
.../fs/bucketing/DateTimeBucketer.java | 16 ++-
.../fs/RollingSinkFaultToleranceITCase.java | 15 +--
.../connectors/fs/RollingSinkITCase.java | 130 ++++++++-----------
.../connectors/fs/RollingSinkSecuredITCase.java | 20 +--
.../BucketingSinkFaultToleranceITCase.java | 17 ++-
.../BucketingSinkFrom12MigrationTest.java | 17 ++-
.../fs/bucketing/BucketingSinkTest.java | 39 +++---
.../fs/bucketing/RollingSinkMigrationTest.java | 7 +-
.../RollingToBucketingMigrationTest.java | 7 +-
.../src/test/resources/log4j-test.properties | 2 +-
.../src/test/resources/log4j-test.properties | 2 +-
26 files changed, 315 insertions(+), 296 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/7292c874/flink-connectors/flink-connector-filesystem/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/pom.xml b/flink-connectors/flink-connector-filesystem/pom.xml
index 07b0ae1..f39758b 100644
--- a/flink-connectors/flink-connector-filesystem/pom.xml
+++ b/flink-connectors/flink-connector-filesystem/pom.xml
@@ -65,7 +65,7 @@ under the License.
<version>${project.version}</version>
<scope>test</scope>
</dependency>
-
+
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
@@ -104,7 +104,6 @@ under the License.
<type>test-jar</type>
</dependency>
-
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
http://git-wip-us.apache.org/repos/asf/flink/blob/7292c874/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java
index 3e3c86b..45e73fe 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java
@@ -18,10 +18,11 @@ package org.apache.flink.streaming.connectors.fs;
* limitations under the License.
*/
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.Arrays;
-import java.util.Map;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
import org.apache.avro.Schema;
import org.apache.avro.file.CodecFactory;
@@ -31,15 +32,15 @@ import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumWriter;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
-import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.Map;
+
/**
* Implementation of AvroKeyValue writer that can be used in Sink.
* Each entry would be wrapped in GenericRecord with key/value fields(same as in m/r lib)
@@ -49,7 +50,7 @@ Usage:
BucketingSink<Tuple2<Long, Long>> sink = new BucketingSink<Tuple2<Long, Long>>("/tmp/path");
sink.setBucketer(new DateTimeBucketer<Tuple2<Long, Long>>("yyyy-MM-dd/HH/mm/"));
sink.setPendingSuffix(".avro");
- Map<String,String> properties = new HashMap<>();
+ Map<String, String> properties = new HashMap<>();
Schema longSchema = Schema.create(Type.LONG);
String keySchema = longSchema.toString();
String valueSchema = longSchema.toString();
@@ -57,7 +58,7 @@ Usage:
properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_VALUE_SCHEMA, valueSchema);
properties.put(AvroKeyValueSinkWriter.CONF_COMPRESS, Boolean.toString(true));
properties.put(AvroKeyValueSinkWriter.CONF_COMPRESS_CODEC, DataFileConstants.SNAPPY_CODEC);
-
+
sink.setWriter(new AvroSinkWriter<Long, Long>(properties));
sink.setBatchSize(1024 * 1024 * 64); // this is 64 MB,
}
@@ -77,37 +78,37 @@ public class AvroKeyValueSinkWriter<K, V> extends StreamWriterBase<Tuple2<K, V>>
private final Map<String, String> properties;
/**
- * C'tor for the writer
- * <p>
- * You can provide different properties that will be used to configure avro key-value writer as simple properties map(see example above)
+ * C'tor for the writer.
+ *
+ * <p>You can provide different properties that will be used to configure avro key-value writer as simple properties map(see example above)
* @param properties
*/
@SuppressWarnings("deprecation")
public AvroKeyValueSinkWriter(Map<String, String> properties) {
this.properties = properties;
-
+
String keySchemaString = properties.get(CONF_OUTPUT_KEY_SCHEMA);
if (keySchemaString == null) {
throw new IllegalStateException("No key schema provided, set '" + CONF_OUTPUT_KEY_SCHEMA + "' property");
}
- Schema.parse(keySchemaString);//verifying that schema valid
-
+ Schema.parse(keySchemaString); //verifying that schema valid
+
String valueSchemaString = properties.get(CONF_OUTPUT_VALUE_SCHEMA);
if (valueSchemaString == null) {
throw new IllegalStateException("No value schema provided, set '" + CONF_OUTPUT_VALUE_SCHEMA + "' property");
}
- Schema.parse(valueSchemaString);//verifying that schema valid
+ Schema.parse(valueSchemaString); //verifying that schema valid
}
- private boolean getBoolean(Map<String,String> conf, String key, boolean def) {
+ private boolean getBoolean(Map<String, String> conf, String key, boolean def) {
String value = conf.get(key);
if (value == null) {
return def;
}
return Boolean.parseBoolean(value);
}
-
- private int getInt(Map<String,String> conf, String key, int def) {
+
+ private int getInt(Map<String, String> conf, String key, int def) {
String value = conf.get(key);
if (value == null) {
return def;
@@ -116,7 +117,7 @@ public class AvroKeyValueSinkWriter<K, V> extends StreamWriterBase<Tuple2<K, V>>
}
//this derived from AvroOutputFormatBase.getCompressionCodec(..)
- private CodecFactory getCompressionCodec(Map<String,String> conf) {
+ private CodecFactory getCompressionCodec(Map<String, String> conf) {
if (getBoolean(conf, CONF_COMPRESS, false)) {
int deflateLevel = getInt(conf, CONF_DEFLATE_LEVEL, CodecFactory.DEFAULT_DEFLATE_LEVEL);
int xzLevel = getInt(conf, CONF_XZ_LEVEL, CodecFactory.DEFAULT_XZ_LEVEL);
@@ -147,12 +148,12 @@ public class AvroKeyValueSinkWriter<K, V> extends StreamWriterBase<Tuple2<K, V>>
@Override
public void close() throws IOException {
- super.close();//the order is important since super.close flushes inside
+ super.close(); //the order is important since super.close flushes inside
if (keyValueWriter != null) {
keyValueWriter.close();
}
}
-
+
@Override
public long flush() throws IOException {
if (keyValueWriter != null) {
@@ -184,7 +185,7 @@ public class AvroKeyValueSinkWriter<K, V> extends StreamWriterBase<Tuple2<K, V>>
public Writer<Tuple2<K, V>> duplicate() {
return new AvroKeyValueSinkWriter<K, V>(properties);
}
-
+
// taken from m/r avro lib to remove dependency on it
private static final class AvroKeyValueWriter<K, V> {
/** A writer for the Avro container file. */
@@ -245,7 +246,12 @@ public class AvroKeyValueSinkWriter<K, V> extends StreamWriterBase<Tuple2<K, V>>
}
}
- // taken from AvroKeyValue avro-mapr lib
+ /**
+ * A reusable Avro generic record for writing key/value pairs to the
+ * file.
+ *
+ * <p>taken from AvroKeyValue avro-mapr lib
+ */
public static class AvroKeyValue<K, V> {
/** The name of the key value pair generic record. */
public static final String KEY_VALUE_PAIR_RECORD_NAME = "KeyValuePair";
@@ -293,7 +299,7 @@ public class AvroKeyValueSinkWriter<K, V> extends StreamWriterBase<Tuple2<K, V>>
/**
* Creates a KeyValuePair generic record schema.
- *
+ *
* @return A schema for a generic record with two fields: 'key' and
* 'value'.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/7292c874/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Bucketer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Bucketer.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Bucketer.java
index 24ad6ab..9caf628 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Bucketer.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Bucketer.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.streaming.connectors.fs;
import org.apache.hadoop.fs.Path;
@@ -25,8 +26,8 @@ import java.io.Serializable;
* A bucketer is used with a {@link RollingSink}
* to put emitted elements into rolling files.
*
- * <p>
- * The {@code RollingSink} has one active bucket that it is writing to at a time. Whenever
+ *
+ * <p>The {@code RollingSink} has one active bucket that it is writing to at a time. Whenever
* a new element arrives it will ask the {@code Bucketer} if a new bucket should be started and
* the old one closed. The {@code Bucketer} can, for example, decide to start new buckets
* based on system time.
http://git-wip-us.apache.org/repos/asf/flink/blob/7292c874/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Clock.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Clock.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Clock.java
index 174707c..eb864c2 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Clock.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Clock.java
@@ -15,19 +15,19 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.flink.streaming.connectors.fs;
+package org.apache.flink.streaming.connectors.fs;
/**
* A clock that can provide the current time.
*
- * <p>
- * Normally this would be system time, but for testing a custom {@code Clock} can be provided.
+ *
+ * <p>Normally this would be system time, but for testing a custom {@code Clock} can be provided.
*/
public interface Clock {
/**
* Return the current system time in milliseconds.
*/
- public long currentTimeMillis();
+ long currentTimeMillis();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7292c874/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/DateTimeBucketer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/DateTimeBucketer.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/DateTimeBucketer.java
index 0df8998..72b4823 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/DateTimeBucketer.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/DateTimeBucketer.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.streaming.connectors.fs;
import org.apache.hadoop.fs.Path;
@@ -29,27 +30,27 @@ import java.util.Date;
/**
* A {@link Bucketer} that assigns to buckets based on current system time.
*
- * <p>
- * The {@code DateTimeBucketer} will create directories of the following form:
+ *
+ * <p>The {@code DateTimeBucketer} will create directories of the following form:
* {@code /{basePath}/{dateTimePath}/}. The {@code basePath} is the path
* that was specified as a base path when creating the
* {@link RollingSink}. The {@code dateTimePath}
* is determined based on the current system time and the user provided format string.
*
- * <p>
- * {@link SimpleDateFormat} is used to derive a date string from the current system time and
+ *
+ * <p>{@link SimpleDateFormat} is used to derive a date string from the current system time and
* the date format string. The default format string is {@code "yyyy-MM-dd--HH"} so the rolling
* files will have a granularity of hours.
*
*
- * <p>
- * Example:
+ *
+ * <p>Example:
*
* <pre>{@code
* Bucketer buck = new DateTimeBucketer("yyyy-MM-dd--HH");
* }</pre>
*
- * This will create for example the following bucket path:
+ * <p>This will create for example the following bucket path:
* {@code /base/1976-12-31-14/}
*
* @deprecated use {@link org.apache.flink.streaming.connectors.fs.bucketing.DateTimeBucketer} instead.
@@ -57,7 +58,7 @@ import java.util.Date;
@Deprecated
public class DateTimeBucketer implements Bucketer {
- private static Logger LOG = LoggerFactory.getLogger(DateTimeBucketer.class);
+ private static final Logger LOG = LoggerFactory.getLogger(DateTimeBucketer.class);
private static final long serialVersionUID = 1L;
@@ -95,7 +96,6 @@ public class DateTimeBucketer implements Bucketer {
this.dateFormatter = new SimpleDateFormat(formatString);
}
-
@Override
public boolean shouldStartNewBucket(Path basePath, Path currentBucketPath) {
String newDateTimeString = dateFormatter.format(new Date(clock.currentTimeMillis()));
http://git-wip-us.apache.org/repos/asf/flink/blob/7292c874/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/NonRollingBucketer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/NonRollingBucketer.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/NonRollingBucketer.java
index 6854596..a03bcb5 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/NonRollingBucketer.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/NonRollingBucketer.java
@@ -15,9 +15,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.streaming.connectors.fs;
import org.apache.flink.streaming.connectors.fs.bucketing.BasePathBucketer;
+
import org.apache.hadoop.fs.Path;
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/7292c874/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
index 429d00a..3d3ea05 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
@@ -15,9 +15,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.streaming.connectors.fs;
-import org.apache.commons.lang3.time.StopWatch;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.OperatorStateStore;
@@ -33,6 +33,8 @@ import org.apache.flink.streaming.api.checkpoint.CheckpointedRestoring;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink;
import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.time.StopWatch;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -56,13 +58,13 @@ import java.util.UUID;
* Sink that emits its input elements to rolling {@link org.apache.hadoop.fs.FileSystem} files. This
* is integrated with the checkpointing mechanism to provide exactly once semantics.
*
- * <p>
- * When creating the sink a {@code basePath} must be specified. The base directory contains
+ *
+ * <p>When creating the sink a {@code basePath} must be specified. The base directory contains
* one directory for every bucket. The bucket directories themselves contain several part files.
* These contain the actual written data.
*
- * <p>
- * The sink uses a {@link Bucketer} to determine the name of bucket directories inside the
+ *
+ * <p>The sink uses a {@link Bucketer} to determine the name of bucket directories inside the
* base directory. Whenever the {@code Bucketer} returns a different directory name than
* it returned before the sink will close the current part files inside that bucket
* and start the new bucket directory. The default bucketer is a {@link DateTimeBucketer} with
@@ -71,8 +73,8 @@ import java.util.UUID;
* {@link NonRollingBucketer} if you don't want to have
* buckets but still write part files in a fault-tolerant way.
*
- * <p>
- * The filenames of the part files contain the part prefix, the parallel subtask index of the sink
+ *
+ * <p>The filenames of the part files contain the part prefix, the parallel subtask index of the sink
* and a rolling counter, for example {@code "part-1-17"}. Per default the part prefix is
* {@code "part"} but this can be
* configured using {@link #setPartPrefix(String)}. When a part file becomes bigger
@@ -80,8 +82,8 @@ import java.util.UUID;
* a new part file is created. The batch size defaults to {@code 384MB}, this can be configured
* using {@link #setBatchSize(long)}.
*
- * <p>
- * Part files can be in one of three states: in-progress, pending or finished. The reason for this
+ *
+ * <p>Part files can be in one of three states: in-progress, pending or finished. The reason for this
* is how the sink works together with the checkpointing mechanism to provide exactly-once semantics
* and fault-tolerance. The part file that is currently being written to is in-progress. Once
* a part file is closed for writing it becomes pending. When a checkpoint is successful the
@@ -95,21 +97,21 @@ import java.util.UUID;
* the different file states and valid-length files can be configured, for example with
* {@link #setPendingSuffix(String)}.
*
- * <p>
- * Note: If checkpointing is not enabled the pending files will never be moved to the finished state.
+ *
+ * <p>Note: If checkpointing is not enabled the pending files will never be moved to the finished state.
* In that case, the pending suffix/prefix can be set to {@code ""} to make the sink work
* in a non-fault-tolerant way but still provide output without prefixes and suffixes.
*
- * <p>
- * The part files are written using an instance of {@link Writer}. By default
+ *
+ * <p>The part files are written using an instance of {@link Writer}. By default
* {@link org.apache.flink.streaming.connectors.fs.StringWriter} is used, which writes the result
* of {@code toString()} for every element. Separated by newlines. You can configure the writer
* using {@link #setWriter(Writer)}. For example,
* {@link org.apache.flink.streaming.connectors.fs.SequenceFileWriter} can be used to write
* Hadoop {@code SequenceFiles}.
*
- * <p>
- * Example:
+ *
+ * <p>Example:
*
* <pre>{@code
* new RollingSink<Tuple2<IntWritable, Text>>(outPath)
@@ -117,7 +119,7 @@ import java.util.UUID;
* .setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm")
* }</pre>
*
- * This will create a sink that writes to {@code SequenceFiles} and rolls every minute.
+ * <p>This will create a sink that writes to {@code SequenceFiles} and rolls every minute.
*
* @see DateTimeBucketer
* @see StringWriter
@@ -134,8 +136,7 @@ public class RollingSink<T> extends RichSinkFunction<T>
private static final long serialVersionUID = 1L;
- private static Logger LOG = LoggerFactory.getLogger(RollingSink.class);
-
+ private static final Logger LOG = LoggerFactory.getLogger(RollingSink.class);
// --------------------------------------------------------------------------------------------
// User configuration values
@@ -145,53 +146,52 @@ public class RollingSink<T> extends RichSinkFunction<T>
/**
* The default maximum size of part files (currently {@code 384 MB}).
*/
- private final long DEFAULT_BATCH_SIZE = 1024L * 1024L * 384L;
+ private static final long DEFAULT_BATCH_SIZE = 1024L * 1024L * 384L;
/**
* This is used for part files that we are writing to but which where not yet confirmed
* by a checkpoint.
*/
- private final String DEFAULT_IN_PROGRESS_SUFFIX = ".in-progress";
+ private static final String DEFAULT_IN_PROGRESS_SUFFIX = ".in-progress";
/**
- * See above, but for prefix
+ * See above, but for prefix.
*/
- private final String DEFAULT_IN_PROGRESS_PREFIX = "_";
+ private static final String DEFAULT_IN_PROGRESS_PREFIX = "_";
/**
* This is used for part files that we are not writing to but which are not yet confirmed by
* checkpoint.
*/
- private final String DEFAULT_PENDING_SUFFIX = ".pending";
+ private static final String DEFAULT_PENDING_SUFFIX = ".pending";
/**
* See above, but for prefix.
*/
- private final String DEFAULT_PENDING_PREFIX = "_";
+ private static final String DEFAULT_PENDING_PREFIX = "_";
/**
* When truncate() is not supported on the used FileSystem we instead write a
* file along the part file with this ending that contains the length up to which
* the part file is valid.
*/
- private final String DEFAULT_VALID_SUFFIX = ".valid-length";
+ private static final String DEFAULT_VALID_SUFFIX = ".valid-length";
/**
* See above, but for prefix.
*/
- private final String DEFAULT_VALID_PREFIX = "_";
+ private static final String DEFAULT_VALID_PREFIX = "_";
/**
* The default prefix for part files.
*/
- private final String DEFAULT_PART_REFIX = "part";
+ private static final String DEFAULT_PART_REFIX = "part";
/**
* The default timeout for asynchronous operations such as recoverLease and truncate. In
* milliseconds.
*/
- private final long DEFAULT_ASYNC_TIMEOUT_MS = 60 * 1000;
-
+ private static final long DEFAULT_ASYNC_TIMEOUT_MS = 60 * 1000;
/**
* The base {@code Path} that stores all bucket directories.
@@ -228,7 +228,7 @@ public class RollingSink<T> extends RichSinkFunction<T>
private String pendingPrefix = DEFAULT_PENDING_PREFIX;
private String validLengthSuffix = DEFAULT_VALID_SUFFIX;
- private String validLengthPrefix= DEFAULT_VALID_PREFIX;
+ private String validLengthPrefix = DEFAULT_VALID_PREFIX;
private String partPrefix = DEFAULT_PART_REFIX;
@@ -242,7 +242,6 @@ public class RollingSink<T> extends RichSinkFunction<T>
// Internal fields (not configurable by user)
// --------------------------------------------------------------------------------------------
-
/**
* The part file that we are currently writing to.
*/
@@ -266,7 +265,7 @@ public class RollingSink<T> extends RichSinkFunction<T>
/**
* We use reflection to get the .truncate() method, this is only available starting with
- * Hadoop 2.7
+ * Hadoop 2.7.
*/
private transient Method refTruncate;
@@ -290,8 +289,8 @@ public class RollingSink<T> extends RichSinkFunction<T>
/**
* Creates a new {@code RollingSink} that writes files to the given base directory.
*
- * <p>
- * This uses a{@link DateTimeBucketer} as bucketer and a {@link StringWriter} has writer.
+ *
+ * <p>This uses a{@link DateTimeBucketer} as bucketer and a {@link StringWriter} has writer.
* The maximum bucket size is set to 384 MB.
*
* @param basePath The directory to which to write the bucket files.
@@ -319,7 +318,7 @@ public class RollingSink<T> extends RichSinkFunction<T>
*/
public RollingSink<T> setFSConfig(org.apache.hadoop.conf.Configuration config) {
this.fsConfig = new Configuration();
- for(Map.Entry<String, String> entry : config) {
+ for (Map.Entry<String, String> entry : config) {
fsConfig.setString(entry.getKey(), entry.getValue());
}
return this;
@@ -380,7 +379,7 @@ public class RollingSink<T> extends RichSinkFunction<T>
}
/**
- * Create a file system with the user-defined hdfs config
+ * Create a file system with the user-defined hdfs config.
* @throws IOException
*/
private void initFileSystem() throws IOException {
@@ -415,8 +414,8 @@ public class RollingSink<T> extends RichSinkFunction<T>
/**
* Determines whether we should change the bucket file we are writing to.
*
- * <p>
- * This will roll if no file was created yet, if the file size is larger than the specified size
+ *
+ * <p>This will roll if no file was created yet, if the file size is larger than the specified size
* or if the {@code Bucketer} determines that we should roll.
*/
private boolean shouldRoll() throws IOException {
@@ -449,8 +448,8 @@ public class RollingSink<T> extends RichSinkFunction<T>
/**
* Opens a new part file.
*
- * <p>
- * This closes the old bucket file and retrieves a new bucket path from the {@code Bucketer}.
+ *
+ * <p>This closes the old bucket file and retrieves a new bucket path from the {@code Bucketer}.
*/
private void openNewPartFile() throws Exception {
closeCurrentPartFile();
@@ -505,8 +504,8 @@ public class RollingSink<T> extends RichSinkFunction<T>
/**
* Closes the current part file.
*
- * <p>
- * This moves the current in-progress part file to a pending file and adds it to the list
+ *
+ * <p>This moves the current in-progress part file to a pending file and adds it to the list
* of pending files in our bucket state.
*/
private void closeCurrentPartFile() throws Exception {
@@ -526,8 +525,8 @@ public class RollingSink<T> extends RichSinkFunction<T>
/**
* Gets the truncate() call using reflection.
- * <p>
- * <b>NOTE: </b>This code comes from Flume
+ *
+ * <p><b>NOTE: </b>This code comes from Flume
*/
private Method reflectTruncate(FileSystem fs) {
Method m = null;
@@ -604,7 +603,7 @@ public class RollingSink<T> extends RichSinkFunction<T>
"The " + getClass().getSimpleName() + " has not been properly initialized.");
int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();
-
+
if (isWriterOpen) {
bucketState.currentFile = currentPartPath.toString();
bucketState.currentFileValidLength = writer.flush();
@@ -668,11 +667,11 @@ public class RollingSink<T> extends RichSinkFunction<T>
DistributedFileSystem dfs = (DistributedFileSystem) fs;
LOG.debug("Trying to recover file lease {}", partPath);
dfs.recoverLease(partPath);
- boolean isclosed= dfs.isFileClosed(partPath);
+ boolean isclosed = dfs.isFileClosed(partPath);
StopWatch sw = new StopWatch();
sw.start();
- while(!isclosed) {
- if(sw.getTime() > asyncTimeout) {
+ while (!isclosed) {
+ if (sw.getTime() > asyncTimeout) {
break;
}
try {
@@ -691,8 +690,8 @@ public class RollingSink<T> extends RichSinkFunction<T>
StopWatch sw = new StopWatch();
sw.start();
long newLen = fs.getFileStatus(partPath).getLen();
- while(newLen != bucketState.currentFileValidLength) {
- if(sw.getTime() > asyncTimeout) {
+ while (newLen != bucketState.currentFileValidLength) {
+ if (sw.getTime() > asyncTimeout) {
break;
}
try {
@@ -749,7 +748,7 @@ public class RollingSink<T> extends RichSinkFunction<T>
}
} catch (IOException e) {
LOG.error("(RESTORE) Error while renaming pending file {} to final path {}: {}", pendingPath, finalPath, e);
- throw new RuntimeException("Error while renaming pending file " + pendingPath+ " to final path " + finalPath, e);
+ throw new RuntimeException("Error while renaming pending file " + pendingPath + " to final path " + finalPath, e);
}
}
}
@@ -785,8 +784,8 @@ public class RollingSink<T> extends RichSinkFunction<T>
/**
* Sets the maximum bucket size in bytes.
*
- * <p>
- * When a bucket part file becomes larger than this size a new bucket part file is started and
+ *
+ * <p>When a bucket part file becomes larger than this size a new bucket part file is started and
* the old one is closed. The name of the bucket files depends on the {@link Bucketer}.
*
* @param batchSize The bucket part file size in bytes.
@@ -875,8 +874,8 @@ public class RollingSink<T> extends RichSinkFunction<T>
/**
* Disable cleanup of leftover in-progress/pending files when the sink is opened.
*
- * <p>
- * This should only be disabled if using the sink without checkpoints, to not remove
+ *
+ * <p>This should only be disabled if using the sink without checkpoints, to not remove
* the files already in the directory.
*
* @deprecated This option is deprecated and remains only for backwards compatibility.
http://git-wip-us.apache.org/repos/asf/flink/blob/7292c874/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java
index 32cadec..901589f 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java
@@ -15,8 +15,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.flink.streaming.connectors.fs;
+package org.apache.flink.streaming.connectors.fs;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -25,6 +25,7 @@ import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -88,7 +89,7 @@ public class SequenceFileWriter<K extends Writable, V extends Writable> extends
}
CompressionCodec codec = null;
-
+
Configuration conf = HadoopFileSystem.getHadoopConfiguration();
if (!compressionCodecName.equals("None")) {
http://git-wip-us.apache.org/repos/asf/flink/blob/7292c874/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StreamWriterBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StreamWriterBase.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StreamWriterBase.java
index a04e4b5..3e9eb11 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StreamWriterBase.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StreamWriterBase.java
@@ -15,12 +15,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.streaming.connectors.fs;
import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,7 +37,7 @@ import java.util.EnumSet;
*/
public abstract class StreamWriterBase<T> implements Writer<T> {
- private static Logger LOG = LoggerFactory.getLogger(BucketingSink.class);
+ private static final Logger LOG = LoggerFactory.getLogger(BucketingSink.class);
/**
* The {@code FSDataOutputStream} for the current part file.
@@ -61,11 +63,11 @@ public abstract class StreamWriterBase<T> implements Writer<T> {
/**
* If hflush is available in this version of HDFS, then this method calls
* hflush, else it calls sync.
+ *
+ * <p>Note: This code comes from Flume
+ *
* @param os - The stream to flush/sync
* @throws java.io.IOException
- *
- * <p>
- * Note: This code comes from Flume
*/
protected void hflushOrSync(FSDataOutputStream os) throws IOException {
try {
@@ -80,8 +82,8 @@ public abstract class StreamWriterBase<T> implements Writer<T> {
String msg = "Error while trying to hflushOrSync!";
LOG.error(msg + " " + e.getCause());
Throwable cause = e.getCause();
- if(cause != null && cause instanceof IOException) {
- throw (IOException)cause;
+ if (cause != null && cause instanceof IOException) {
+ throw (IOException) cause;
}
throw new RuntimeException(msg, e);
} catch (Exception e) {
@@ -94,12 +96,11 @@ public abstract class StreamWriterBase<T> implements Writer<T> {
/**
* Gets the hflush call using reflection. Fallback to sync if hflush is not available.
*
- * <p>
- * Note: This code comes from Flume
+ * <p>Note: This code comes from Flume
*/
private Method reflectHflushOrSync(FSDataOutputStream os) {
Method m = null;
- if(os != null) {
+ if (os != null) {
Class<?> fsDataOutputStreamClass = os.getClass();
try {
m = fsDataOutputStreamClass.getMethod("hflush");
http://git-wip-us.apache.org/repos/asf/flink/blob/7292c874/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java
index 6568a86..d2ef9d6 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.streaming.connectors.fs;
import org.apache.hadoop.fs.FSDataOutputStream;
http://git-wip-us.apache.org/repos/asf/flink/blob/7292c874/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SystemClock.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SystemClock.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SystemClock.java
index 41663df..eedb370 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SystemClock.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SystemClock.java
@@ -15,8 +15,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.flink.streaming.connectors.fs;
+package org.apache.flink.streaming.connectors.fs;
/**
* A {@link Clock} that uses {@code System.currentTimeMillis()} to determine the system time.
http://git-wip-us.apache.org/repos/asf/flink/blob/7292c874/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Writer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Writer.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Writer.java
index c3b4cb6..ab896c8 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Writer.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Writer.java
@@ -15,9 +15,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.streaming.connectors.fs;
import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink;
+
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -58,7 +60,7 @@ public interface Writer<T> extends Serializable {
* taken. The call should close all state related to the current output file,
* including the output stream opened in {@code open}.
*/
- void close() throws IOException ;
+ void close() throws IOException;
/**
* Writes one element to the bucket file.
http://git-wip-us.apache.org/repos/asf/flink/blob/7292c874/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BasePathBucketer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BasePathBucketer.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BasePathBucketer.java
index 0bf14b3..2f325f6 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BasePathBucketer.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BasePathBucketer.java
@@ -15,9 +15,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.streaming.connectors.fs.bucketing;
import org.apache.flink.streaming.connectors.fs.Clock;
+
import org.apache.hadoop.fs.Path;
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/7292c874/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/Bucketer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/Bucketer.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/Bucketer.java
index 86aa9f3..f2eebf3 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/Bucketer.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/Bucketer.java
@@ -15,9 +15,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.streaming.connectors.fs.bucketing;
import org.apache.flink.streaming.connectors.fs.Clock;
+
import org.apache.hadoop.fs.Path;
import java.io.Serializable;
@@ -26,8 +28,8 @@ import java.io.Serializable;
* A bucketer is used with a {@link BucketingSink}
* to put emitted elements into rolling files.
*
- * <p>
- * The {@code BucketingSink} can be writing to many buckets at a time, and it is responsible for managing
+ *
+ * <p>The {@code BucketingSink} can be writing to many buckets at a time, and it is responsible for managing
* a set of active buckets. Whenever a new element arrives it will ask the {@code Bucketer} for the bucket
* path the element should fall in. The {@code Bucketer} can, for example, determine buckets based on
* system time.
http://git-wip-us.apache.org/repos/asf/flink/blob/7292c874/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
index 58dd4dc..70168b5 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
@@ -15,9 +15,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.streaming.connectors.fs.bucketing;
-import org.apache.commons.lang3.time.StopWatch;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.ListState;
@@ -41,6 +41,8 @@ import org.apache.flink.streaming.connectors.fs.Writer;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.time.StopWatch;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -54,23 +56,23 @@ import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
-import java.util.Iterator;
/**
* Sink that emits its input elements to {@link FileSystem} files within
* buckets. This is integrated with the checkpointing mechanism to provide exactly once semantics.
*
- * <p>
- * When creating the sink a {@code basePath} must be specified. The base directory contains
+ *
+ * <p>When creating the sink a {@code basePath} must be specified. The base directory contains
* one directory for every bucket. The bucket directories themselves contain several part files,
* one for each parallel subtask of the sink. These part files contain the actual output data.
*
- * <p>
- * The sink uses a {@link Bucketer} to determine in which bucket directory each element should
+ *
+ * <p>The sink uses a {@link Bucketer} to determine in which bucket directory each element should
* be written to inside the base directory. The {@code Bucketer} can, for example, use time or
* a property of the element to determine the bucket directory. The default {@code Bucketer} is a
* {@link DateTimeBucketer} which will create one new bucket every hour. You can specify
@@ -78,8 +80,8 @@ import java.util.Iterator;
* {@link BasePathBucketer} if you don't want to have buckets but still want to write part-files
* in a fault-tolerant way.
*
- * <p>
- * The filenames of the part files contain the part prefix, the parallel subtask index of the sink
+ *
+ * <p>The filenames of the part files contain the part prefix, the parallel subtask index of the sink
* and a rolling counter. For example the file {@code "part-1-17"} contains the data from
* {@code subtask 1} of the sink and is the {@code 17th} bucket created by that subtask. Per default
* the part prefix is {@code "part"} but this can be configured using {@link #setPartPrefix(String)}.
@@ -87,8 +89,8 @@ import java.util.Iterator;
* the part counter is increased and a new part file is created. The batch size defaults to {@code 384MB},
* this can be configured using {@link #setBatchSize(long)}.
*
- * <p>
- * In some scenarios, the open buckets are required to change based on time. In these cases, the sink
+ *
+ * <p>In some scenarios, the open buckets are required to change based on time. In these cases, the sink
* needs to determine when a bucket has become inactive, in order to flush and close the part file.
* To support this there are two configurable settings:
* <ol>
@@ -97,17 +99,17 @@ import java.util.Iterator;
* <li>the minimum amount of time a bucket has to not receive any data before it is considered inactive,
* configured by {@link #setInactiveBucketThreshold(long)}</li>
* </ol>
- * Both of these parameters default to {@code 60,000 ms}, or {@code 1 min}.
+ * Both of these parameters default to {@code 60, 000 ms}, or {@code 1 min}.
*
- * <p>
- * Part files can be in one of three states: {@code in-progress}, {@code pending} or {@code finished}.
+ *
+ * <p>Part files can be in one of three states: {@code in-progress}, {@code pending} or {@code finished}.
* The reason for this is how the sink works together with the checkpointing mechanism to provide exactly-once
* semantics and fault-tolerance. The part file that is currently being written to is {@code in-progress}. Once
* a part file is closed for writing it becomes {@code pending}. When a checkpoint is successful the currently
* pending files will be moved to {@code finished}.
*
- * <p>
- * If case of a failure, and in order to guarantee exactly-once semantics, the sink should roll back to the state it
+ *
+ * <p>If case of a failure, and in order to guarantee exactly-once semantics, the sink should roll back to the state it
* had when that last successful checkpoint occurred. To this end, when restoring, the restored files in {@code pending}
* state are transferred into the {@code finished} state while any {@code in-progress} files are rolled back, so that
* they do not contain data that arrived after the checkpoint from which we restore. If the {@code FileSystem} supports
@@ -117,8 +119,8 @@ import java.util.Iterator;
* to that point. The prefixes and suffixes for the different file states and valid-length files can be configured
* using the adequate setter method, e.g. {@link #setPendingSuffix(String)}.
*
- * <p>
- * <b>NOTE:</b>
+ *
+ * <p><b>NOTE:</b>
* <ol>
* <li>
* If checkpointing is not enabled the pending files will never be moved to the finished state. In that case,
@@ -134,15 +136,15 @@ import java.util.Iterator;
* </li>
* </ol>
*
- * <p>
- * Example:
+ *
+ * <p>Example:
* <pre>{@code
* new BucketingSink<Tuple2<IntWritable, Text>>(outPath)
* .setWriter(new SequenceFileWriter<IntWritable, Text>())
* .setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm")
* }</pre>
*
- * This will create a sink that writes to {@code SequenceFiles} and rolls every minute.
+ * <p>This will create a sink that writes to {@code SequenceFiles} and rolls every minute.
*
* @see DateTimeBucketer
* @see StringWriter
@@ -157,7 +159,7 @@ public class BucketingSink<T>
private static final long serialVersionUID = 1L;
- private static Logger LOG = LoggerFactory.getLogger(BucketingSink.class);
+ private static final Logger LOG = LoggerFactory.getLogger(BucketingSink.class);
// --------------------------------------------------------------------------------------------
// User configuration values
@@ -167,69 +169,68 @@ public class BucketingSink<T>
/**
* The default maximum size of part files (currently {@code 384 MB}).
*/
- private final long DEFAULT_BATCH_SIZE = 1024L * 1024L * 384L;
+ private static final long DEFAULT_BATCH_SIZE = 1024L * 1024L * 384L;
/**
* The default time between checks for inactive buckets. By default, {60 sec}.
*/
- private final long DEFAULT_INACTIVE_BUCKET_CHECK_INTERVAL_MS = 60 * 1000L;
+ private static final long DEFAULT_INACTIVE_BUCKET_CHECK_INTERVAL_MS = 60 * 1000L;
/**
* The default threshold (in {@code ms}) for marking a bucket as inactive and
* closing its part files. By default, {60 sec}.
*/
- private final long DEFAULT_INACTIVE_BUCKET_THRESHOLD_MS = 60 * 1000L;
+ private static final long DEFAULT_INACTIVE_BUCKET_THRESHOLD_MS = 60 * 1000L;
/**
* The suffix for {@code in-progress} part files. These are files we are
* currently writing to, but which were not yet confirmed by a checkpoint.
*/
- private final String DEFAULT_IN_PROGRESS_SUFFIX = ".in-progress";
+ private static final String DEFAULT_IN_PROGRESS_SUFFIX = ".in-progress";
/**
* The prefix for {@code in-progress} part files. These are files we are
* currently writing to, but which were not yet confirmed by a checkpoint.
*/
- private final String DEFAULT_IN_PROGRESS_PREFIX = "_";
+ private static final String DEFAULT_IN_PROGRESS_PREFIX = "_";
/**
* The suffix for {@code pending} part files. These are closed files that we are
* not currently writing to (inactive or reached {@link #batchSize}), but which
* were not yet confirmed by a checkpoint.
*/
- private final String DEFAULT_PENDING_SUFFIX = ".pending";
+ private static final String DEFAULT_PENDING_SUFFIX = ".pending";
/**
* The prefix for {@code pending} part files. These are closed files that we are
* not currently writing to (inactive or reached {@link #batchSize}), but which
* were not yet confirmed by a checkpoint.
*/
- private final String DEFAULT_PENDING_PREFIX = "_";
+ private static final String DEFAULT_PENDING_PREFIX = "_";
/**
* When {@code truncate()} is not supported by the used {@link FileSystem}, we create
* a file along the part file with this suffix that contains the length up to which
* the part file is valid.
*/
- private final String DEFAULT_VALID_SUFFIX = ".valid-length";
+ private static final String DEFAULT_VALID_SUFFIX = ".valid-length";
/**
* When {@code truncate()} is not supported by the used {@link FileSystem}, we create
* a file along the part file with this preffix that contains the length up to which
* the part file is valid.
*/
- private final String DEFAULT_VALID_PREFIX = "_";
+ private static final String DEFAULT_VALID_PREFIX = "_";
/**
* The default prefix for part files.
*/
- private final String DEFAULT_PART_REFIX = "part";
+ private static final String DEFAULT_PART_REFIX = "part";
/**
* The default timeout for asynchronous operations such as recoverLease and truncate (in {@code ms}).
*/
- private final long DEFAULT_ASYNC_TIMEOUT_MS = 60 * 1000;
-
+ private static final long DEFAULT_ASYNC_TIMEOUT_MS = 60 * 1000;
/**
* The base {@code Path} that stores all bucket directories.
@@ -259,7 +260,7 @@ public class BucketingSink<T>
private String pendingPrefix = DEFAULT_PENDING_PREFIX;
private String validLengthSuffix = DEFAULT_VALID_SUFFIX;
- private String validLengthPrefix= DEFAULT_VALID_PREFIX;
+ private String validLengthPrefix = DEFAULT_VALID_PREFIX;
private String partPrefix = DEFAULT_PART_REFIX;
@@ -273,7 +274,7 @@ public class BucketingSink<T>
// -------------------------------------------ยง-------------------------------------------------
/**
- * We use reflection to get the .truncate() method, this is only available starting with Hadoop 2.7
+ * We use reflection to get the .truncate() method, this is only available starting with Hadoop 2.7 .
*/
private transient Method refTruncate;
@@ -286,7 +287,7 @@ public class BucketingSink<T>
private transient ListState<State<T>> restoredBucketStates;
/**
- * User-defined FileSystem parameters
+ * User-defined FileSystem parameters.
*/
private Configuration fsConfig;
@@ -302,8 +303,8 @@ public class BucketingSink<T>
/**
* Creates a new {@code BucketingSink} that writes files to the given base directory.
*
- * <p>
- * This uses a{@link DateTimeBucketer} as {@link Bucketer} and a {@link StringWriter} has writer.
+ *
+ * <p>This uses a{@link DateTimeBucketer} as {@link Bucketer} and a {@link StringWriter} has writer.
* The maximum bucket size is set to 384 MB.
*
* @param basePath The directory to which to write the bucket files.
@@ -330,7 +331,7 @@ public class BucketingSink<T>
*/
public BucketingSink<T> setFSConfig(org.apache.hadoop.conf.Configuration config) {
this.fsConfig = new Configuration();
- for(Map.Entry<String, String> entry : config) {
+ for (Map.Entry<String, String> entry : config) {
fsConfig.setString(entry.getKey(), entry.getValue());
}
return this;
@@ -572,12 +573,12 @@ public class BucketingSink<T>
/**
* Gets the truncate() call using reflection.
- * <p>
- * <b>NOTE:</b> This code comes from Flume.
+ *
+ * <p><b>NOTE:</b> This code comes from Flume.
*/
private Method reflectTruncate(FileSystem fs) {
Method m = null;
- if(fs != null) {
+ if (fs != null) {
Class<?> fsClass = fs.getClass();
try {
m = fsClass.getMethod("truncate", Path.class, long.class);
@@ -897,8 +898,8 @@ public class BucketingSink<T>
/**
* Sets the maximum bucket size in bytes.
*
- * <p>
- * When a bucket part file becomes larger than this size a new bucket part file is started and
+ *
+ * <p>When a bucket part file becomes larger than this size a new bucket part file is started and
* the old one is closed. The name of the bucket files depends on the {@link Bucketer}.
*
* @param batchSize The bucket part file size in bytes.
@@ -1008,8 +1009,8 @@ public class BucketingSink<T>
/**
* Disable cleanup of leftover in-progress/pending files when the sink is opened.
*
- * <p>
- * This should only be disabled if using the sink without checkpoints, to not remove
+ *
+ * <p>This should only be disabled if using the sink without checkpoints, to not remove
* the files already in the directory.
*
* @deprecated This option is deprecated and remains only for backwards compatibility.
http://git-wip-us.apache.org/repos/asf/flink/blob/7292c874/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/DateTimeBucketer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/DateTimeBucketer.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/DateTimeBucketer.java
index b985e14..b7035fe 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/DateTimeBucketer.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/DateTimeBucketer.java
@@ -15,9 +15,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.streaming.connectors.fs.bucketing;
import org.apache.flink.streaming.connectors.fs.Clock;
+
import org.apache.hadoop.fs.Path;
import java.io.IOException;
@@ -28,27 +30,27 @@ import java.util.Date;
/**
* A {@link Bucketer} that assigns to buckets based on current system time.
*
- * <p>
- * The {@code DateTimeBucketer} will create directories of the following form:
+ *
+ * <p>The {@code DateTimeBucketer} will create directories of the following form:
* {@code /{basePath}/{dateTimePath}/}. The {@code basePath} is the path
* that was specified as a base path when creating the
* {@link BucketingSink}. The {@code dateTimePath}
* is determined based on the current system time and the user provided format string.
*
- * <p>
- * {@link SimpleDateFormat} is used to derive a date string from the current system time and
+ *
+ * <p>{@link SimpleDateFormat} is used to derive a date string from the current system time and
* the date format string. The default format string is {@code "yyyy-MM-dd--HH"} so the rolling
* files will have a granularity of hours.
*
*
- * <p>
- * Example:
+ *
+ * <p>Example:
*
* <pre>{@code
* Bucketer buck = new DateTimeBucketer("yyyy-MM-dd--HH");
* }</pre>
*
- * This will create for example the following bucket path:
+ * <p>This will create for example the following bucket path:
* {@code /base/1976-12-31-14/}
*
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/7292c874/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java
index 2d8492f..b096db4 100644
--- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java
+++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java
@@ -15,9 +15,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.streaming.connectors.fs;
-import com.google.common.collect.Sets;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.datastream.DataStream;
@@ -25,6 +25,8 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.test.checkpointing.StreamFaultToleranceTestBase;
import org.apache.flink.util.NetUtils;
+
+import com.google.common.collect.Sets;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.LocatedFileStatus;
@@ -55,8 +57,8 @@ import static org.junit.Assert.assertTrue;
/**
* Tests for {@link org.apache.flink.streaming.connectors.fs.RollingSink}.
*
- * <p>
- * This test only verifies the exactly once behaviour of the sink. Another test tests the
+ *
+ * <p>This test only verifies the exactly once behaviour of the sink. Another test tests the
* rolling behaviour.
*
* @deprecated should be removed with the {@link RollingSink}.
@@ -64,7 +66,7 @@ import static org.junit.Assert.assertTrue;
@Deprecated
public class RollingSinkFaultToleranceITCase extends StreamFaultToleranceTestBase {
- final long NUM_STRINGS = 16_000;
+ private static final long NUM_STRINGS = 16_000;
@ClassRule
public static TemporaryFolder tempFolder = new TemporaryFolder();
@@ -105,10 +107,8 @@ public class RollingSinkFaultToleranceITCase extends StreamFaultToleranceTestBas
public void testProgram(StreamExecutionEnvironment env) {
assertTrue("Broken test setup", NUM_STRINGS % 40 == 0);
- int PARALLELISM = 12;
-
env.enableCheckpointing(20);
- env.setParallelism(PARALLELISM);
+ env.setParallelism(12);
env.disableOperatorChaining();
DataStream<String> stream = env.addSource(new StringGeneratingSourceFunction(NUM_STRINGS)).startNewChain();
@@ -211,7 +211,6 @@ public class RollingSinkFaultToleranceITCase extends StreamFaultToleranceTestBas
private long failurePos;
private long count;
-
OnceFailingIdentityMapper(long numElements) {
this.numElements = numElements;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7292c874/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
index 72f2f21..10d1846 100644
--- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
+++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
@@ -18,15 +18,6 @@
package org.apache.flink.streaming.connectors.fs;
-import org.apache.avro.Schema;
-import org.apache.avro.Schema.Type;
-import org.apache.avro.file.DataFileConstants;
-import org.apache.avro.file.DataFileStream;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.generic.GenericData.StringType;
-import org.apache.avro.specific.SpecificDatumReader;
-import org.apache.commons.io.FileUtils;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFilterFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
@@ -45,9 +36,18 @@ import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
import org.apache.flink.util.Collector;
import org.apache.flink.util.NetUtils;
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Type;
+import org.apache.avro.file.DataFileConstants;
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericData.StringType;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
@@ -55,7 +55,6 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
-
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
@@ -77,8 +76,8 @@ import java.util.Map;
* tests test the different output methods as well as the rolling feature using a manual clock
* that increases time in lockstep with element computation using latches.
*
- * <p>
- * This only tests the rolling behaviour of the sink. There is a separate ITCase that verifies
+ *
+ * <p>This only tests the rolling behaviour of the sink. There is a separate ITCase that verifies
* exactly once behaviour.
*
* @deprecated should be removed with the {@link RollingSink}.
@@ -128,13 +127,12 @@ public class RollingSinkITCase extends StreamingMultipleProgramsTestBase {
*/
@Test
public void testNonRollingStringWriter() throws Exception {
- final int NUM_ELEMENTS = 20;
- final int PARALLELISM = 2;
+ final int numElements = 20;
final String outPath = hdfsURI + "/string-non-rolling-out";
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(PARALLELISM);
+ env.setParallelism(2);
- DataStream<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction(NUM_ELEMENTS))
+ DataStream<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction(numElements))
.broadcast()
.filter(new OddEvenFilter());
@@ -145,7 +143,7 @@ public class RollingSinkITCase extends StreamingMultipleProgramsTestBase {
.setPendingSuffix("");
source
- .map(new MapFunction<Tuple2<Integer,String>, String>() {
+ .map(new MapFunction<Tuple2<Integer, String>, String>() {
private static final long serialVersionUID = 1L;
@Override
public String map(Tuple2<Integer, String> value) throws Exception {
@@ -160,7 +158,7 @@ public class RollingSinkITCase extends StreamingMultipleProgramsTestBase {
BufferedReader br = new BufferedReader(new InputStreamReader(inStream));
- for (int i = 0; i < NUM_ELEMENTS; i += 2) {
+ for (int i = 0; i < numElements; i += 2) {
String line = br.readLine();
Assert.assertEquals("message #" + i, line);
}
@@ -171,7 +169,7 @@ public class RollingSinkITCase extends StreamingMultipleProgramsTestBase {
br = new BufferedReader(new InputStreamReader(inStream));
- for (int i = 1; i < NUM_ELEMENTS; i += 2) {
+ for (int i = 1; i < numElements; i += 2) {
String line = br.readLine();
Assert.assertEquals("message #" + i, line);
}
@@ -185,17 +183,16 @@ public class RollingSinkITCase extends StreamingMultipleProgramsTestBase {
*/
@Test
public void testNonRollingSequenceFileWithoutCompressionWriter() throws Exception {
- final int NUM_ELEMENTS = 20;
- final int PARALLELISM = 2;
+ final int numElements = 20;
final String outPath = hdfsURI + "/seq-no-comp-non-rolling-out";
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(PARALLELISM);
+ env.setParallelism(2);
- DataStream<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction(NUM_ELEMENTS))
+ DataStream<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction(numElements))
.broadcast()
.filter(new OddEvenFilter());
- DataStream<Tuple2<IntWritable, Text>> mapped = source.map(new MapFunction<Tuple2<Integer,String>, Tuple2<IntWritable, Text>>() {
+ DataStream<Tuple2<IntWritable, Text>> mapped = source.map(new MapFunction<Tuple2<Integer, String>, Tuple2<IntWritable, Text>>() {
private static final long serialVersionUID = 1L;
@Override
@@ -204,7 +201,6 @@ public class RollingSinkITCase extends StreamingMultipleProgramsTestBase {
}
});
-
RollingSink<Tuple2<IntWritable, Text>> sink = new RollingSink<Tuple2<IntWritable, Text>>(outPath)
.setWriter(new SequenceFileWriter<IntWritable, Text>())
.setBucketer(new NonRollingBucketer())
@@ -227,7 +223,7 @@ public class RollingSinkITCase extends StreamingMultipleProgramsTestBase {
IntWritable intWritable = new IntWritable();
Text txt = new Text();
- for (int i = 0; i < NUM_ELEMENTS; i += 2) {
+ for (int i = 0; i < numElements; i += 2) {
reader.next(intWritable, txt);
Assert.assertEquals(i, intWritable.get());
Assert.assertEquals("message #" + i, txt.toString());
@@ -244,7 +240,7 @@ public class RollingSinkITCase extends StreamingMultipleProgramsTestBase {
100000,
new Configuration());
- for (int i = 1; i < NUM_ELEMENTS; i += 2) {
+ for (int i = 1; i < numElements; i += 2) {
reader.next(intWritable, txt);
Assert.assertEquals(i, intWritable.get());
Assert.assertEquals("message #" + i, txt.toString());
@@ -260,17 +256,16 @@ public class RollingSinkITCase extends StreamingMultipleProgramsTestBase {
*/
@Test
public void testNonRollingSequenceFileWithCompressionWriter() throws Exception {
- final int NUM_ELEMENTS = 20;
- final int PARALLELISM = 2;
+ final int numElements = 20;
final String outPath = hdfsURI + "/seq-non-rolling-out";
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(PARALLELISM);
+ env.setParallelism(2);
- DataStream<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction(NUM_ELEMENTS))
+ DataStream<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction(numElements))
.broadcast()
.filter(new OddEvenFilter());
- DataStream<Tuple2<IntWritable, Text>> mapped = source.map(new MapFunction<Tuple2<Integer,String>, Tuple2<IntWritable, Text>>() {
+ DataStream<Tuple2<IntWritable, Text>> mapped = source.map(new MapFunction<Tuple2<Integer, String>, Tuple2<IntWritable, Text>>() {
private static final long serialVersionUID = 1L;
@Override
@@ -279,7 +274,6 @@ public class RollingSinkITCase extends StreamingMultipleProgramsTestBase {
}
});
-
RollingSink<Tuple2<IntWritable, Text>> sink = new RollingSink<Tuple2<IntWritable, Text>>(outPath)
.setWriter(new SequenceFileWriter<IntWritable, Text>("Default", SequenceFile.CompressionType.BLOCK))
.setBucketer(new NonRollingBucketer())
@@ -302,7 +296,7 @@ public class RollingSinkITCase extends StreamingMultipleProgramsTestBase {
IntWritable intWritable = new IntWritable();
Text txt = new Text();
- for (int i = 0; i < NUM_ELEMENTS; i += 2) {
+ for (int i = 0; i < numElements; i += 2) {
reader.next(intWritable, txt);
Assert.assertEquals(i, intWritable.get());
Assert.assertEquals("message #" + i, txt.toString());
@@ -319,7 +313,7 @@ public class RollingSinkITCase extends StreamingMultipleProgramsTestBase {
100000,
new Configuration());
- for (int i = 1; i < NUM_ELEMENTS; i += 2) {
+ for (int i = 1; i < numElements; i += 2) {
reader.next(intWritable, txt);
Assert.assertEquals(i, intWritable.get());
Assert.assertEquals("message #" + i, txt.toString());
@@ -328,25 +322,22 @@ public class RollingSinkITCase extends StreamingMultipleProgramsTestBase {
reader.close();
inStream.close();
}
-
-
+
/**
* This tests {@link AvroKeyValueSinkWriter}
* with non-rolling output and without compression.
*/
@Test
public void testNonRollingAvroKeyValueWithoutCompressionWriter() throws Exception {
- final int NUM_ELEMENTS = 20;
- final int PARALLELISM = 2;
+ final int numElements = 20;
final String outPath = hdfsURI + "/avro-kv-no-comp-non-rolling-out";
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(PARALLELISM);
+ env.setParallelism(2);
- DataStream<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction(NUM_ELEMENTS))
+ DataStream<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction(numElements))
.broadcast()
.filter(new OddEvenFilter());
-
Map<String, String> properties = new HashMap<>();
Schema keySchema = Schema.create(Type.INT);
Schema valueSchema = Schema.create(Type.STRING);
@@ -369,7 +360,7 @@ public class RollingSinkITCase extends StreamingMultipleProgramsTestBase {
FSDataInputStream inStream = dfs.open(new Path(outPath + "/part-0-0"));
SpecificDatumReader<GenericRecord> elementReader = new SpecificDatumReader<GenericRecord>(elementSchema);
DataFileStream<GenericRecord> dataFileStream = new DataFileStream<GenericRecord>(inStream, elementReader);
- for (int i = 0; i < NUM_ELEMENTS; i += 2) {
+ for (int i = 0; i < numElements; i += 2) {
AvroKeyValue<Integer, String> wrappedEntry = new AvroKeyValue<Integer, String>(dataFileStream.next());
int key = wrappedEntry.getKey().intValue();
Assert.assertEquals(i, key);
@@ -383,7 +374,7 @@ public class RollingSinkITCase extends StreamingMultipleProgramsTestBase {
inStream = dfs.open(new Path(outPath + "/part-1-0"));
dataFileStream = new DataFileStream<GenericRecord>(inStream, elementReader);
- for (int i = 1; i < NUM_ELEMENTS; i += 2) {
+ for (int i = 1; i < numElements; i += 2) {
AvroKeyValue<Integer, String> wrappedEntry = new AvroKeyValue<Integer, String>(dataFileStream.next());
int key = wrappedEntry.getKey().intValue();
Assert.assertEquals(i, key);
@@ -394,24 +385,22 @@ public class RollingSinkITCase extends StreamingMultipleProgramsTestBase {
dataFileStream.close();
inStream.close();
}
-
+
/**
* This tests {@link AvroKeyValueSinkWriter}
* with non-rolling output and with compression.
*/
@Test
public void testNonRollingAvroKeyValueWithCompressionWriter() throws Exception {
- final int NUM_ELEMENTS = 20;
- final int PARALLELISM = 2;
+ final int numElements = 20;
final String outPath = hdfsURI + "/avro-kv-no-comp-non-rolling-out";
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(PARALLELISM);
+ env.setParallelism(2);
- DataStream<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction(NUM_ELEMENTS))
+ DataStream<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction(numElements))
.broadcast()
.filter(new OddEvenFilter());
-
Map<String, String> properties = new HashMap<>();
Schema keySchema = Schema.create(Type.INT);
Schema valueSchema = Schema.create(Type.STRING);
@@ -436,7 +425,7 @@ public class RollingSinkITCase extends StreamingMultipleProgramsTestBase {
FSDataInputStream inStream = dfs.open(new Path(outPath + "/part-0-0"));
SpecificDatumReader<GenericRecord> elementReader = new SpecificDatumReader<GenericRecord>(elementSchema);
DataFileStream<GenericRecord> dataFileStream = new DataFileStream<GenericRecord>(inStream, elementReader);
- for (int i = 0; i < NUM_ELEMENTS; i += 2) {
+ for (int i = 0; i < numElements; i += 2) {
AvroKeyValue<Integer, String> wrappedEntry = new AvroKeyValue<Integer, String>(dataFileStream.next());
int key = wrappedEntry.getKey().intValue();
Assert.assertEquals(i, key);
@@ -450,7 +439,7 @@ public class RollingSinkITCase extends StreamingMultipleProgramsTestBase {
inStream = dfs.open(new Path(outPath + "/part-1-0"));
dataFileStream = new DataFileStream<GenericRecord>(inStream, elementReader);
- for (int i = 1; i < NUM_ELEMENTS; i += 2) {
+ for (int i = 1; i < numElements; i += 2) {
AvroKeyValue<Integer, String> wrappedEntry = new AvroKeyValue<Integer, String>(dataFileStream.next());
int key = wrappedEntry.getKey().intValue();
Assert.assertEquals(i, key);
@@ -462,20 +451,18 @@ public class RollingSinkITCase extends StreamingMultipleProgramsTestBase {
inStream.close();
}
-
/**
- * This tests user defined hdfs configuration
+ * This tests user defined hdfs configuration.
* @throws Exception
*/
@Test
public void testUserDefinedConfiguration() throws Exception {
- final int NUM_ELEMENTS = 20;
- final int PARALLELISM = 2;
+ final int numElements = 20;
final String outPath = hdfsURI + "/string-non-rolling-with-config";
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(PARALLELISM);
+ env.setParallelism(2);
- DataStream<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction(NUM_ELEMENTS))
+ DataStream<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction(numElements))
.broadcast()
.filter(new OddEvenFilter());
@@ -490,7 +477,7 @@ public class RollingSinkITCase extends StreamingMultipleProgramsTestBase {
.setPendingSuffix("");
source
- .map(new MapFunction<Tuple2<Integer,String>, String>() {
+ .map(new MapFunction<Tuple2<Integer, String>, String>() {
private static final long serialVersionUID = 1L;
@Override
public String map(Tuple2<Integer, String> value) throws Exception {
@@ -505,7 +492,7 @@ public class RollingSinkITCase extends StreamingMultipleProgramsTestBase {
BufferedReader br = new BufferedReader(new InputStreamReader(inStream));
- for (int i = 0; i < NUM_ELEMENTS; i += 2) {
+ for (int i = 0; i < numElements; i += 2) {
String line = br.readLine();
Assert.assertEquals("message #" + i, line);
}
@@ -516,7 +503,7 @@ public class RollingSinkITCase extends StreamingMultipleProgramsTestBase {
br = new BufferedReader(new InputStreamReader(inStream));
- for (int i = 1; i < NUM_ELEMENTS; i += 2) {
+ for (int i = 1; i < numElements; i += 2) {
String line = br.readLine();
Assert.assertEquals("message #" + i, line);
}
@@ -525,8 +512,8 @@ public class RollingSinkITCase extends StreamingMultipleProgramsTestBase {
}
// we use this to synchronize the clock changes to elements being processed
- final static MultiShotLatch latch1 = new MultiShotLatch();
- final static MultiShotLatch latch2 = new MultiShotLatch();
+ private static final MultiShotLatch latch1 = new MultiShotLatch();
+ private static final MultiShotLatch latch2 = new MultiShotLatch();
/**
* This uses {@link org.apache.flink.streaming.connectors.fs.DateTimeBucketer} to
@@ -536,19 +523,16 @@ public class RollingSinkITCase extends StreamingMultipleProgramsTestBase {
*/
@Test
public void testDateTimeRollingStringWriter() throws Exception {
- final int NUM_ELEMENTS = 20;
- final int PARALLELISM = 2;
+ final int numElements = 20;
final String outPath = hdfsURI + "/rolling-out";
DateTimeBucketer.setClock(new ModifyableClock());
ModifyableClock.setCurrentTime(0);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(PARALLELISM);
-
-
+ env.setParallelism(2);
DataStream<Tuple2<Integer, String>> source = env.addSource(new WaitingTestSourceFunction(
- NUM_ELEMENTS))
+ numElements))
.broadcast();
// the parallel flatMap is chained to the sink, so when it has seen 5 elements it can
@@ -664,7 +648,7 @@ public class RollingSinkITCase extends StreamingMultipleProgramsTestBase {
testHarness.processElement(new StreamRecord<>("test1", 1L));
testHarness.processElement(new StreamRecord<>("test2", 1L));
- checkFs(outDir, 1, 1 ,0, 0);
+ checkFs(outDir, 1, 1 , 0, 0);
testHarness.processElement(new StreamRecord<>("test3", 1L));
checkFs(outDir, 1, 2, 0, 0);
@@ -961,7 +945,7 @@ public class RollingSinkITCase extends StreamingMultipleProgramsTestBase {
}
}
- public static class OddEvenFilter extends RichFilterFunction<Tuple2<Integer, String>> {
+ private static class OddEvenFilter extends RichFilterFunction<Tuple2<Integer, String>> {
private static final long serialVersionUID = 1L;
@Override
@@ -974,7 +958,7 @@ public class RollingSinkITCase extends StreamingMultipleProgramsTestBase {
}
}
- public static class ModifyableClock implements Clock {
+ private static class ModifyableClock implements Clock {
private static volatile long currentTime = 0;
http://git-wip-us.apache.org/repos/asf/flink/blob/7292c874/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
index 768ca5e..6bd75d4 100644
--- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
+++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
@@ -26,9 +26,10 @@ import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.runtime.security.SecurityUtils;
import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.apache.flink.test.util.SecureTestEnvironment;
-import org.apache.flink.test.util.TestingSecurityContext;
import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.test.util.TestingSecurityContext;
import org.apache.flink.util.NetUtils;
+
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.http.HttpConfig;
@@ -47,21 +48,21 @@ import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HTTP_POLICY_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HTTP_POLICY_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY;
/**
* Tests for running {@link RollingSinkSecuredITCase} which is an extension of {@link RollingSink} in secure environment
- * Note: only executed for Hadoop version > 3.x.x
+ * Note: only executed for Hadoop version > 3.x.x.
*/
public class RollingSinkSecuredITCase extends RollingSinkITCase {
@@ -141,7 +142,6 @@ public class RollingSinkSecuredITCase extends RollingSinkITCase {
map.put("HADOOP_CONF_DIR", hdfsSiteXML.getParentFile().getAbsolutePath());
TestBaseUtils.setEnv(map);
-
MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf);
builder.checkDataNodeAddrConfig(true);
builder.checkDataNodeHostConfig(true);
http://git-wip-us.apache.org/repos/asf/flink/blob/7292c874/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkFaultToleranceITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkFaultToleranceITCase.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkFaultToleranceITCase.java
index 85f23b6..1ed4a7f 100644
--- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkFaultToleranceITCase.java
+++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkFaultToleranceITCase.java
@@ -15,9 +15,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.streaming.connectors.fs.bucketing;
-import com.google.common.collect.Sets;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.datastream.DataStream;
@@ -25,6 +25,8 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.test.checkpointing.StreamFaultToleranceTestBase;
import org.apache.flink.util.NetUtils;
+
+import com.google.common.collect.Sets;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.LocatedFileStatus;
@@ -37,11 +39,11 @@ import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.rules.TemporaryFolder;
+import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
-import java.io.BufferedReader;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
@@ -55,13 +57,13 @@ import static org.junit.Assert.assertTrue;
/**
* Tests for {@link BucketingSink}.
*
- * <p>
- * This test only verifies the exactly once behaviour of the sink. Another test tests the
+ *
+ * <p>This test only verifies the exactly once behaviour of the sink. Another test tests the
* rolling behaviour.
*/
public class BucketingSinkFaultToleranceITCase extends StreamFaultToleranceTestBase {
- final long NUM_STRINGS = 16_000;
+ static final long NUM_STRINGS = 16_000;
@ClassRule
public static TemporaryFolder tempFolder = new TemporaryFolder();
@@ -102,10 +104,8 @@ public class BucketingSinkFaultToleranceITCase extends StreamFaultToleranceTestB
public void testProgram(StreamExecutionEnvironment env) {
assertTrue("Broken test setup", NUM_STRINGS % 40 == 0);
- int PARALLELISM = 12;
-
env.enableCheckpointing(20);
- env.setParallelism(PARALLELISM);
+ env.setParallelism(12);
env.disableOperatorChaining();
DataStream<String> stream = env.addSource(new StringGeneratingSourceFunction(NUM_STRINGS)).startNewChain();
@@ -208,7 +208,6 @@ public class BucketingSinkFaultToleranceITCase extends StreamFaultToleranceTestB
private long failurePos;
private long count;
-
OnceFailingIdentityMapper(long numElements) {
this.numElements = numElements;
}