You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/05/28 06:17:47 UTC
[17/21] flink git commit: [FLINK-6711] Activate strict checkstyle for
flink-connector-filesystem
http://git-wip-us.apache.org/repos/asf/flink/blob/7292c874/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkFrom12MigrationTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkFrom12MigrationTest.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkFrom12MigrationTest.java
index 350b7b4..e3db8bb 100644
--- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkFrom12MigrationTest.java
+++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkFrom12MigrationTest.java
@@ -15,15 +15,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.flink.streaming.connectors.fs.bucketing;
-import static org.junit.Assert.assertTrue;
+package org.apache.flink.streaming.connectors.fs.bucketing;
-import java.io.File;
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import org.apache.commons.io.FileUtils;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.runtime.state.FunctionInitializationContext;
@@ -33,6 +27,8 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OperatorSnapshotUtil;
+
+import org.apache.commons.io.FileUtils;
import org.apache.hadoop.fs.Path;
import org.junit.Assert;
import org.junit.ClassRule;
@@ -40,6 +36,13 @@ import org.junit.Ignore;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertTrue;
+
/**
* Tests for checking whether {@link BucketingSink} can restore from snapshots that were done
* using the Flink 1.2 {@link BucketingSink}.
http://git-wip-us.apache.org/repos/asf/flink/blob/7292c874/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
index 090c54a..67af91f 100644
--- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
+++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
@@ -15,15 +15,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.streaming.connectors.fs.bucketing;
-import org.apache.avro.Schema;
-import org.apache.avro.file.DataFileConstants;
-import org.apache.avro.file.DataFileStream;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.specific.SpecificDatumReader;
-import org.apache.commons.io.FileUtils;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -39,6 +33,14 @@ import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.util.NetUtils;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileConstants;
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
@@ -50,10 +52,10 @@ import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.junit.AfterClass;
+import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
-import org.junit.Assert;
import org.junit.rules.TemporaryFolder;
import java.io.BufferedReader;
@@ -63,6 +65,9 @@ import java.io.InputStreamReader;
import java.util.HashMap;
import java.util.Map;
+/**
+ * Tests for the {@link BucketingSink}.
+ */
public class BucketingSinkTest {
@ClassRule
public static TemporaryFolder tempFolder = new TemporaryFolder();
@@ -115,8 +120,8 @@ public class BucketingSinkTest {
.setWriter(new StringWriter<String>())
.setPartPrefix(PART_PREFIX)
.setPendingPrefix("")
- .setInactiveBucketCheckInterval(5*60*1000L)
- .setInactiveBucketThreshold(5*60*1000L)
+ .setInactiveBucketCheckInterval(5 * 60 * 1000L)
+ .setInactiveBucketThreshold(5 * 60 * 1000L)
.setPendingSuffix(PENDING_SUFFIX)
.setInProgressSuffix(IN_PROGRESS_SUFFIX);
@@ -175,7 +180,7 @@ public class BucketingSinkTest {
testHarness.processElement(new StreamRecord<>("test1", 1L));
testHarness.processElement(new StreamRecord<>("test2", 1L));
- checkFs(outDir, 2, 0 ,0, 0);
+ checkFs(outDir, 2, 0 , 0, 0);
testHarness.setProcessingTime(101L); // put some in pending
checkFs(outDir, 0, 2, 0, 0);
@@ -210,7 +215,7 @@ public class BucketingSinkTest {
testHarness.processElement(new StreamRecord<>("test1", 1L));
testHarness.processElement(new StreamRecord<>("test2", 1L));
- checkFs(outDir, 2, 0 ,0, 0);
+ checkFs(outDir, 2, 0 , 0, 0);
// this is to check the inactivity threshold
testHarness.setProcessingTime(101L);
@@ -758,13 +763,13 @@ public class BucketingSinkTest {
testHarness.processElement(new StreamRecord<>(Integer.toString(i % step1NumIds)));
}
- testHarness.setProcessingTime(2*60*1000L);
+ testHarness.setProcessingTime(2 * 60 * 1000L);
for (int i = 0; i < numElementsPerStep; i++) {
testHarness.processElement(new StreamRecord<>(Integer.toString(i % step2NumIds)));
}
- testHarness.setProcessingTime(6*60*1000L);
+ testHarness.setProcessingTime(6 * 60 * 1000L);
for (int i = 0; i < numElementsPerStep; i++) {
testHarness.processElement(new StreamRecord<>(Integer.toString(i % step2NumIds)));
@@ -791,7 +796,7 @@ public class BucketingSinkTest {
}
/**
- * This tests user defined hdfs configuration
+ * This tests user defined hdfs configuration.
* @throws Exception
*/
@Test
@@ -810,10 +815,10 @@ public class BucketingSinkTest {
Configuration conf = new Configuration();
conf.set("io.file.buffer.size", "40960");
- BucketingSink<Tuple2<Integer,String>> sink = new BucketingSink<Tuple2<Integer, String>>(outPath)
+ BucketingSink<Tuple2<Integer, String>> sink = new BucketingSink<Tuple2<Integer, String>>(outPath)
.setFSConfig(conf)
.setWriter(new StreamWriterWithConfigCheck<Integer, String>(properties, "io.file.buffer.size", "40960"))
- .setBucketer(new BasePathBucketer<Tuple2<Integer,String>>())
+ .setBucketer(new BasePathBucketer<Tuple2<Integer, String>>())
.setPartPrefix(PART_PREFIX)
.setPendingPrefix("")
.setPendingSuffix("");
http://git-wip-us.apache.org/repos/asf/flink/blob/7292c874/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/RollingSinkMigrationTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/RollingSinkMigrationTest.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/RollingSinkMigrationTest.java
index 3355fae..75eb685 100644
--- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/RollingSinkMigrationTest.java
+++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/RollingSinkMigrationTest.java
@@ -15,14 +15,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.streaming.connectors.fs.bucketing;
-import org.apache.commons.io.FileUtils;
import org.apache.flink.streaming.api.operators.StreamSink;
import org.apache.flink.streaming.connectors.fs.RollingSink;
import org.apache.flink.streaming.connectors.fs.StringWriter;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+
+import org.apache.commons.io.FileUtils;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
@@ -34,6 +36,9 @@ import java.net.URL;
import java.util.List;
import java.util.Map;
+/**
+ * Tests the migration from 1.1 snapshots.
+ */
@Deprecated
public class RollingSinkMigrationTest {
http://git-wip-us.apache.org/repos/asf/flink/blob/7292c874/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/RollingToBucketingMigrationTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/RollingToBucketingMigrationTest.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/RollingToBucketingMigrationTest.java
index 257b157..ed4ab88 100644
--- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/RollingToBucketingMigrationTest.java
+++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/RollingToBucketingMigrationTest.java
@@ -15,14 +15,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.streaming.connectors.fs.bucketing;
-import org.apache.commons.io.FileUtils;
import org.apache.flink.streaming.api.operators.StreamSink;
import org.apache.flink.streaming.connectors.fs.RollingSink;
import org.apache.flink.streaming.connectors.fs.StringWriter;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+
+import org.apache.commons.io.FileUtils;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
@@ -34,6 +36,9 @@ import java.net.URL;
import java.util.List;
import java.util.Map;
+/**
+ * Tests the migration from {@link RollingSink} to {@link BucketingSink}.
+ */
public class RollingToBucketingMigrationTest {
@ClassRule
http://git-wip-us.apache.org/repos/asf/flink/blob/7292c874/flink-connectors/flink-connector-filesystem/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/test/resources/log4j-test.properties b/flink-connectors/flink-connector-filesystem/src/test/resources/log4j-test.properties
index 5c22851..490767a 100644
--- a/flink-connectors/flink-connector-filesystem/src/test/resources/log4j-test.properties
+++ b/flink-connectors/flink-connector-filesystem/src/test/resources/log4j-test.properties
@@ -26,4 +26,4 @@ log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
# suppress the irrelevant (wrong) warnings from the netty channel handler
log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
-log4j.logger.org.apache.directory=OFF, testlogger
\ No newline at end of file
+log4j.logger.org.apache.directory=OFF, testlogger
http://git-wip-us.apache.org/repos/asf/flink/blob/7292c874/flink-connectors/flink-hadoop-compatibility/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/resources/log4j-test.properties b/flink-connectors/flink-hadoop-compatibility/src/test/resources/log4j-test.properties
index 0b686e5..881dc06 100644
--- a/flink-connectors/flink-hadoop-compatibility/src/test/resources/log4j-test.properties
+++ b/flink-connectors/flink-hadoop-compatibility/src/test/resources/log4j-test.properties
@@ -24,4 +24,4 @@ log4j.appender.A1=org.apache.log4j.ConsoleAppender
# A1 uses PatternLayout.
log4j.appender.A1.layout=org.apache.log4j.PatternLayout
-log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
\ No newline at end of file
+log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n