You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/05/28 06:17:50 UTC
[20/21] flink git commit: [FLINK-6711] Activate strict checkstyle for
flink-avro
[FLINK-6711] Activate strict checkstyle for flink-avro
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b58545ec
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b58545ec
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b58545ec
Branch: refs/heads/master
Commit: b58545ecde76ae88be11ebdc305adbd9b132d302
Parents: 1a3a5b6
Author: zentol <ch...@apache.org>
Authored: Fri May 26 00:15:34 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Sun May 28 00:11:35 2017 +0200
----------------------------------------------------------------------
flink-connectors/flink-avro/pom.xml | 2 +-
.../apache/flink/api/avro/DataInputDecoder.java | 47 +++--
.../flink/api/avro/DataOutputEncoder.java | 47 +++--
.../api/avro/FSDataInputStreamWrapper.java | 13 +-
.../flink/api/java/io/AvroInputFormat.java | 40 ++---
.../flink/api/java/io/AvroOutputFormat.java | 30 ++--
.../src/test/assembly/test-assembly.xml | 4 +-
.../api/avro/AvroExternalJarProgramITCase.java | 14 +-
.../flink/api/avro/AvroOutputFormatITCase.java | 39 +++--
.../flink/api/avro/EncoderDecoderTest.java | 173 ++++++++++---------
.../avro/testjar/AvroExternalJarProgram.java | 132 +++++++-------
.../apache/flink/api/io/avro/AvroPojoTest.java | 24 +--
.../api/io/avro/AvroRecordInputFormatTest.java | 150 ++++++++--------
.../io/avro/AvroSplittableInputFormatTest.java | 133 +++++++-------
.../api/io/avro/example/AvroTypeExample.java | 46 +++--
.../io/AvroInputFormatTypeExtractionTest.java | 9 +-
.../flink/api/java/io/AvroOutputFormatTest.java | 35 ++--
.../src/test/resources/avro/user.avsc | 4 +-
18 files changed, 469 insertions(+), 473 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/b58545ec/flink-connectors/flink-avro/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-avro/pom.xml b/flink-connectors/flink-avro/pom.xml
index 5082924..d057177 100644
--- a/flink-connectors/flink-avro/pom.xml
+++ b/flink-connectors/flink-avro/pom.xml
@@ -19,7 +19,7 @@ under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
+
<modelVersion>4.0.0</modelVersion>
<parent>
http://git-wip-us.apache.org/repos/asf/flink/blob/b58545ec/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/DataInputDecoder.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/DataInputDecoder.java b/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/DataInputDecoder.java
index 59da4cb..870d66f 100644
--- a/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/DataInputDecoder.java
+++ b/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/DataInputDecoder.java
@@ -18,20 +18,22 @@
package org.apache.flink.api.avro;
-import java.io.DataInput;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
import org.apache.avro.io.Decoder;
import org.apache.avro.util.Utf8;
+import java.io.DataInput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+/**
+ * A {@link Decoder} that reads from a {@link DataInput}.
+ */
public class DataInputDecoder extends Decoder {
-
+
private final Utf8 stringDecoder = new Utf8();
-
+
private DataInput in;
-
+
public void setIn(DataInput in) {
this.in = in;
}
@@ -39,10 +41,9 @@ public class DataInputDecoder extends Decoder {
// --------------------------------------------------------------------------------------------
// primitives
// --------------------------------------------------------------------------------------------
-
+
@Override
public void readNull() {}
-
@Override
public boolean readBoolean() throws IOException {
@@ -68,12 +69,12 @@ public class DataInputDecoder extends Decoder {
public double readDouble() throws IOException {
return in.readDouble();
}
-
+
@Override
public int readEnum() throws IOException {
return readInt();
}
-
+
// --------------------------------------------------------------------------------------------
// bytes
// --------------------------------------------------------------------------------------------
@@ -82,7 +83,7 @@ public class DataInputDecoder extends Decoder {
public void readFixed(byte[] bytes, int start, int length) throws IOException {
in.readFully(bytes, start, length);
}
-
+
@Override
public ByteBuffer readBytes(ByteBuffer old) throws IOException {
int length = readInt();
@@ -97,34 +98,32 @@ public class DataInputDecoder extends Decoder {
result.limit(length);
return result;
}
-
-
+
@Override
public void skipFixed(int length) throws IOException {
skipBytes(length);
}
-
+
@Override
public void skipBytes() throws IOException {
int num = readInt();
skipBytes(num);
}
-
+
// --------------------------------------------------------------------------------------------
// strings
// --------------------------------------------------------------------------------------------
-
-
+
@Override
public Utf8 readString(Utf8 old) throws IOException {
int length = readInt();
Utf8 result = (old != null ? old : new Utf8());
result.setByteLength(length);
-
+
if (length > 0) {
in.readFully(result.getBytes(), 0, length);
}
-
+
return result;
}
@@ -172,7 +171,7 @@ public class DataInputDecoder extends Decoder {
public long skipMap() throws IOException {
return readVarLongCount(in);
}
-
+
// --------------------------------------------------------------------------------------------
// union
// --------------------------------------------------------------------------------------------
@@ -181,17 +180,17 @@ public class DataInputDecoder extends Decoder {
public int readIndex() throws IOException {
return readInt();
}
-
+
// --------------------------------------------------------------------------------------------
// utils
// --------------------------------------------------------------------------------------------
-
+
private void skipBytes(int num) throws IOException {
while (num > 0) {
num -= in.skipBytes(num);
}
}
-
+
public static long readVarLongCount(DataInput in) throws IOException {
long value = in.readUnsignedByte();
http://git-wip-us.apache.org/repos/asf/flink/blob/b58545ec/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/DataOutputEncoder.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/DataOutputEncoder.java b/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/DataOutputEncoder.java
index 0102cc1..beae330 100644
--- a/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/DataOutputEncoder.java
+++ b/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/DataOutputEncoder.java
@@ -18,36 +18,35 @@
package org.apache.flink.api.avro;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
import org.apache.avro.io.Encoder;
import org.apache.avro.util.Utf8;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+/**
+ * An {@link Encoder} that writes data to a {@link DataOutput}.
+ */
public final class DataOutputEncoder extends Encoder implements java.io.Serializable {
-
+
private static final long serialVersionUID = 1L;
-
+
private DataOutput out;
-
-
+
public void setOut(DataOutput out) {
this.out = out;
}
-
@Override
public void flush() throws IOException {}
// --------------------------------------------------------------------------------------------
// primitives
// --------------------------------------------------------------------------------------------
-
+
@Override
public void writeNull() {}
-
@Override
public void writeBoolean(boolean b) throws IOException {
@@ -73,13 +72,12 @@ public final class DataOutputEncoder extends Encoder implements java.io.Serializ
public void writeDouble(double d) throws IOException {
out.writeDouble(d);
}
-
+
@Override
public void writeEnum(int e) throws IOException {
out.writeInt(e);
}
-
-
+
// --------------------------------------------------------------------------------------------
// bytes
// --------------------------------------------------------------------------------------------
@@ -88,7 +86,7 @@ public final class DataOutputEncoder extends Encoder implements java.io.Serializ
public void writeFixed(byte[] bytes, int start, int len) throws IOException {
out.write(bytes, start, len);
}
-
+
@Override
public void writeBytes(byte[] bytes, int start, int len) throws IOException {
out.writeInt(len);
@@ -96,17 +94,17 @@ public final class DataOutputEncoder extends Encoder implements java.io.Serializ
out.write(bytes, start, len);
}
}
-
+
@Override
public void writeBytes(ByteBuffer bytes) throws IOException {
int num = bytes.remaining();
out.writeInt(num);
-
+
if (num > 0) {
writeFixed(bytes);
}
}
-
+
// --------------------------------------------------------------------------------------------
// strings
// --------------------------------------------------------------------------------------------
@@ -116,11 +114,11 @@ public final class DataOutputEncoder extends Encoder implements java.io.Serializ
byte[] bytes = Utf8.getBytesFor(str);
writeBytes(bytes, 0, bytes.length);
}
-
+
@Override
public void writeString(Utf8 utf8) throws IOException {
writeBytes(utf8.getBytes(), 0, utf8.getByteLength());
-
+
}
// --------------------------------------------------------------------------------------------
@@ -158,22 +156,21 @@ public final class DataOutputEncoder extends Encoder implements java.io.Serializ
// --------------------------------------------------------------------------------------------
// union
// --------------------------------------------------------------------------------------------
-
+
@Override
public void writeIndex(int unionIndex) throws IOException {
out.writeInt(unionIndex);
}
-
+
// --------------------------------------------------------------------------------------------
// utils
// --------------------------------------------------------------------------------------------
-
-
+
public static void writeVarLongCount(DataOutput out, long val) throws IOException {
if (val < 0) {
throw new IOException("Illegal count (must be non-negative): " + val);
}
-
+
while ((val & ~0x7FL) != 0) {
out.write(((int) val) | 0x80);
val >>>= 7;
http://git-wip-us.apache.org/repos/asf/flink/blob/b58545ec/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/FSDataInputStreamWrapper.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/FSDataInputStreamWrapper.java b/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/FSDataInputStreamWrapper.java
index 709c4f1..19e4a89 100644
--- a/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/FSDataInputStreamWrapper.java
+++ b/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/FSDataInputStreamWrapper.java
@@ -16,20 +16,19 @@
* limitations under the License.
*/
-
package org.apache.flink.api.avro;
-import java.io.Closeable;
-import java.io.IOException;
+import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.avro.file.SeekableInput;
-import org.apache.flink.core.fs.FSDataInputStream;
+import java.io.Closeable;
+import java.io.IOException;
/**
- * Code copy pasted from org.apache.avro.mapred.FSInput (which is Apache licensed as well)
- *
- * The wrapper keeps track of the position in the data stream.
+ * Code copy pasted from org.apache.avro.mapred.FSInput (which is Apache licensed as well).
+ *
+ * <p>The wrapper keeps track of the position in the data stream.
*/
public class FSDataInputStreamWrapper implements Closeable, SeekableInput {
private final FSDataInputStream stream;
http://git-wip-us.apache.org/repos/asf/flink/blob/b58545ec/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java b/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java
index 73067c1..33105cc 100644
--- a/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java
+++ b/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java
@@ -16,10 +16,19 @@
* limitations under the License.
*/
-
package org.apache.flink.api.java.io;
-import java.io.IOException;
+import org.apache.flink.api.avro.FSDataInputStreamWrapper;
+import org.apache.flink.api.common.io.CheckpointableInputFormat;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.SeekableInput;
@@ -28,19 +37,10 @@ import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.reflect.ReflectDatumReader;
import org.apache.avro.specific.SpecificDatumReader;
-import org.apache.flink.api.common.io.CheckpointableInputFormat;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.flink.api.avro.FSDataInputStreamWrapper;
-import org.apache.flink.api.common.io.FileInputFormat;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.core.fs.FileInputSplit;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
/**
* Provides a {@link FileInputFormat} for Avro records.
@@ -59,7 +59,7 @@ public class AvroInputFormat<E> extends FileInputFormat<E> implements ResultType
private static final Logger LOG = LoggerFactory.getLogger(AvroInputFormat.class);
private final Class<E> avroValueType;
-
+
private boolean reuseAvroValue = true;
private transient DataFileReader<E> dataFileReader;
@@ -68,7 +68,7 @@ public class AvroInputFormat<E> extends FileInputFormat<E> implements ResultType
private transient long recordsReadSinceLastSync;
- private long lastSync = -1l;
+ private long lastSync = -1L;
public AvroInputFormat(Path filePath, Class<E> type) {
super(filePath);
@@ -91,16 +91,16 @@ public class AvroInputFormat<E> extends FileInputFormat<E> implements ResultType
public void setUnsplittable(boolean unsplittable) {
this.unsplittable = unsplittable;
}
-
+
// --------------------------------------------------------------------------------------------
// Typing
// --------------------------------------------------------------------------------------------
-
+
@Override
public TypeInformation<E> getProducedType() {
return TypeExtractor.getForClass(this.avroValueType);
}
-
+
// --------------------------------------------------------------------------------------------
// Input Format Methods
// --------------------------------------------------------------------------------------------
@@ -155,7 +155,7 @@ public class AvroInputFormat<E> extends FileInputFormat<E> implements ResultType
// if we start a new block, then register the event, and
// restart the counter.
- if(dataFileReader.previousSync() != lastSync) {
+ if (dataFileReader.previousSync() != lastSync) {
lastSync = dataFileReader.previousSync();
recordsReadSinceLastSync = 0;
}
@@ -199,7 +199,7 @@ public class AvroInputFormat<E> extends FileInputFormat<E> implements ResultType
// open and read until the record we were before
// the checkpoint and discard the values
dataFileReader.seek(lastSync);
- for(int i = 0; i < recordsReadSinceLastSync; i++) {
+ for (int i = 0; i < recordsReadSinceLastSync; i++) {
dataFileReader.next(null);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b58545ec/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java b/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java
index ae90362..aed40bf 100644
--- a/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java
+++ b/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java
@@ -15,8 +15,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.api.java.io;
+import org.apache.flink.api.common.io.FileOutputFormat;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.core.fs.Path;
+
import org.apache.avro.Schema;
import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileWriter;
@@ -24,15 +29,16 @@ import org.apache.avro.io.DatumWriter;
import org.apache.avro.reflect.ReflectData;
import org.apache.avro.reflect.ReflectDatumWriter;
import org.apache.avro.specific.SpecificDatumWriter;
-import org.apache.flink.api.common.io.FileOutputFormat;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.core.fs.Path;
import java.io.IOException;
import java.io.Serializable;
import static org.apache.flink.util.Preconditions.checkNotNull;
+/**
+ * {@link FileOutputFormat} for Avro records.
+ * @param <E>
+ */
public class AvroOutputFormat<E> extends FileOutputFormat<E> implements Serializable {
/**
@@ -40,11 +46,11 @@ public class AvroOutputFormat<E> extends FileOutputFormat<E> implements Serializ
*/
public enum Codec {
- NULL((byte)0, CodecFactory.nullCodec()),
- SNAPPY((byte)1, CodecFactory.snappyCodec()),
- BZIP2((byte)2, CodecFactory.bzip2Codec()),
- DEFLATE((byte)3, CodecFactory.deflateCodec(CodecFactory.DEFAULT_DEFLATE_LEVEL)),
- XZ((byte)4, CodecFactory.xzCodec(CodecFactory.DEFAULT_XZ_LEVEL));
+ NULL((byte) 0, CodecFactory.nullCodec()),
+ SNAPPY((byte) 1, CodecFactory.snappyCodec()),
+ BZIP2((byte) 2, CodecFactory.bzip2Codec()),
+ DEFLATE((byte) 3, CodecFactory.deflateCodec(CodecFactory.DEFAULT_DEFLATE_LEVEL)),
+ XZ((byte) 4, CodecFactory.xzCodec(CodecFactory.DEFAULT_XZ_LEVEL));
private byte codecByte;
@@ -80,7 +86,7 @@ public class AvroOutputFormat<E> extends FileOutputFormat<E> implements Serializ
private transient Schema userDefinedSchema = null;
private transient Codec codec = null;
-
+
private transient DataFileWriter<E> dataFileWriter;
public AvroOutputFormat(Path filePath, Class<E> type) {
@@ -124,7 +130,7 @@ public class AvroOutputFormat<E> extends FileOutputFormat<E> implements Serializ
if (org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroValueType)) {
datumWriter = new SpecificDatumWriter<E>(avroValueType);
try {
- schema = ((org.apache.avro.specific.SpecificRecordBase)avroValueType.newInstance()).getSchema();
+ schema = ((org.apache.avro.specific.SpecificRecordBase) avroValueType.newInstance()).getSchema();
} catch (InstantiationException | IllegalAccessException e) {
throw new RuntimeException(e.getMessage());
}
@@ -152,7 +158,7 @@ public class AvroOutputFormat<E> extends FileOutputFormat<E> implements Serializ
out.writeByte(-1);
}
- if(userDefinedSchema != null) {
+ if (userDefinedSchema != null) {
byte[] json = userDefinedSchema.toString().getBytes(ConfigConstants.DEFAULT_CHARSET);
out.writeInt(json.length);
out.write(json);
@@ -170,7 +176,7 @@ public class AvroOutputFormat<E> extends FileOutputFormat<E> implements Serializ
}
int length = in.readInt();
- if(length != 0) {
+ if (length != 0) {
byte[] json = new byte[length];
in.readFully(json);
http://git-wip-us.apache.org/repos/asf/flink/blob/b58545ec/flink-connectors/flink-avro/src/test/assembly/test-assembly.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-avro/src/test/assembly/test-assembly.xml b/flink-connectors/flink-avro/src/test/assembly/test-assembly.xml
index 0f4561a..0cbdbe1 100644
--- a/flink-connectors/flink-avro/src/test/assembly/test-assembly.xml
+++ b/flink-connectors/flink-avro/src/test/assembly/test-assembly.xml
@@ -26,11 +26,11 @@ under the License.
<fileSets>
<fileSet>
<directory>${project.build.testOutputDirectory}</directory>
- <outputDirectory>/</outputDirectory>
+ <outputDirectory></outputDirectory>
<!--modify/add include to match your package(s) -->
<includes>
<include>org/apache/flink/api/avro/testjar/**</include>
</includes>
</fileSet>
</fileSets>
-</assembly>
\ No newline at end of file
+</assembly>
http://git-wip-us.apache.org/repos/asf/flink/blob/b58545ec/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
index 063a363..7bcba04 100644
--- a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
+++ b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
@@ -18,21 +18,25 @@
package org.apache.flink.api.avro;
-import java.io.File;
-import java.net.URL;
-import java.util.Collections;
-
+import org.apache.flink.api.avro.testjar.AvroExternalJarProgram;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
-
import org.apache.flink.test.util.TestEnvironment;
import org.apache.flink.util.TestLogger;
+
import org.junit.Assert;
import org.junit.Test;
+import java.io.File;
+import java.net.URL;
+import java.util.Collections;
+
+/**
+ * IT case for the {@link AvroExternalJarProgram}.
+ */
public class AvroExternalJarProgramITCase extends TestLogger {
private static final String JAR_FILE = "maven-test-jar.jar";
http://git-wip-us.apache.org/repos/asf/flink/blob/b58545ec/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatITCase.java b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatITCase.java
index 3b01ccb..f630f41 100644
--- a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatITCase.java
+++ b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatITCase.java
@@ -18,24 +18,27 @@
package org.apache.flink.api.avro;
-import org.junit.Assert;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.avro.file.DataFileReader;
-import org.apache.avro.io.DatumReader;
-import org.apache.avro.reflect.ReflectDatumReader;
-import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.io.avro.example.User;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.io.AvroOutputFormat;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.test.util.JavaProgramTestBase;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.junit.Assert;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * IT cases for the {@link AvroOutputFormat}.
+ */
@SuppressWarnings("serial")
public class AvroOutputFormatITCase extends JavaProgramTestBase {
@@ -57,7 +60,6 @@ public class AvroOutputFormatITCase extends JavaProgramTestBase {
outputPath2 = getTempDirPath("avro_output2");
}
-
@Override
protected void testProgram() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@@ -129,11 +131,9 @@ public class AvroOutputFormatITCase extends JavaProgramTestBase {
Assert.assertTrue("expected user " + expectedResult + " not found.", result2.contains(expectedResult));
}
-
}
-
- public final static class ConvertToUser extends RichMapFunction<Tuple3<String, Integer, String>, User> {
+ private static final class ConvertToUser extends RichMapFunction<Tuple3<String, Integer, String>, User> {
@Override
public User map(Tuple3<String, Integer, String> value) throws Exception {
@@ -141,7 +141,7 @@ public class AvroOutputFormatITCase extends JavaProgramTestBase {
}
}
- public final static class ConvertToReflective extends RichMapFunction<User, ReflectiveUser> {
+ private static final class ConvertToReflective extends RichMapFunction<User, ReflectiveUser> {
@Override
public ReflectiveUser map(User value) throws Exception {
@@ -149,8 +149,7 @@ public class AvroOutputFormatITCase extends JavaProgramTestBase {
}
}
-
- public static class ReflectiveUser {
+ private static class ReflectiveUser {
private String name;
private int favoriteNumber;
private String favoriteColor;
@@ -162,13 +161,15 @@ public class AvroOutputFormatITCase extends JavaProgramTestBase {
this.favoriteNumber = favoriteNumber;
this.favoriteColor = favoriteColor;
}
-
+
public String getName() {
return this.name;
}
+
public String getFavoriteColor() {
return this.favoriteColor;
}
+
public int getFavoriteNumber() {
return this.favoriteNumber;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b58545ec/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java
index c39db15..808c257 100644
--- a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java
+++ b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java
@@ -18,6 +18,16 @@
package org.apache.flink.api.avro;
+import org.apache.flink.api.io.avro.generated.Address;
+import org.apache.flink.api.io.avro.generated.Colors;
+import org.apache.flink.api.io.avro.generated.Fixed16;
+import org.apache.flink.api.io.avro.generated.User;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.reflect.ReflectDatumWriter;
+import org.junit.Test;
+
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
@@ -29,16 +39,9 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
-import org.apache.avro.reflect.ReflectDatumReader;
-import org.apache.avro.reflect.ReflectDatumWriter;
-import org.apache.flink.api.io.avro.generated.Address;
-import org.apache.flink.api.io.avro.generated.Colors;
-import org.apache.flink.api.io.avro.generated.Fixed16;
-import org.apache.flink.api.io.avro.generated.User;
-import org.apache.flink.util.StringUtils;
-import org.junit.Test;
-
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
/**
* Tests the {@link DataOutputEncoder} and {@link DataInputDecoder} classes for Avro serialization.
@@ -48,32 +51,32 @@ public class EncoderDecoderTest {
public void testComplexStringsDirecty() {
try {
Random rnd = new Random(349712539451944123L);
-
+
for (int i = 0; i < 10; i++) {
String testString = StringUtils.getRandomString(rnd, 10, 100);
-
+
ByteArrayOutputStream baos = new ByteArrayOutputStream(512);
{
DataOutputStream dataOut = new DataOutputStream(baos);
DataOutputEncoder encoder = new DataOutputEncoder();
encoder.setOut(dataOut);
-
+
encoder.writeString(testString);
dataOut.flush();
dataOut.close();
}
-
+
byte[] data = baos.toByteArray();
-
+
// deserialize
{
ByteArrayInputStream bais = new ByteArrayInputStream(data);
DataInputStream dataIn = new DataInputStream(bais);
DataInputDecoder decoder = new DataInputDecoder();
decoder.setIn(dataIn);
-
+
String deserialized = decoder.readString();
-
+
assertEquals(testString, deserialized);
}
}
@@ -84,49 +87,49 @@ public class EncoderDecoderTest {
fail("Test failed due to an exception: " + e.getMessage());
}
}
-
+
@Test
public void testPrimitiveTypes() {
-
+
testObjectSerialization(new Boolean(true));
testObjectSerialization(new Boolean(false));
-
+
testObjectSerialization(Byte.valueOf((byte) 0));
testObjectSerialization(Byte.valueOf((byte) 1));
testObjectSerialization(Byte.valueOf((byte) -1));
testObjectSerialization(Byte.valueOf(Byte.MIN_VALUE));
testObjectSerialization(Byte.valueOf(Byte.MAX_VALUE));
-
+
testObjectSerialization(Short.valueOf((short) 0));
testObjectSerialization(Short.valueOf((short) 1));
testObjectSerialization(Short.valueOf((short) -1));
testObjectSerialization(Short.valueOf(Short.MIN_VALUE));
testObjectSerialization(Short.valueOf(Short.MAX_VALUE));
-
+
testObjectSerialization(Integer.valueOf(0));
testObjectSerialization(Integer.valueOf(1));
testObjectSerialization(Integer.valueOf(-1));
testObjectSerialization(Integer.valueOf(Integer.MIN_VALUE));
testObjectSerialization(Integer.valueOf(Integer.MAX_VALUE));
-
+
testObjectSerialization(Long.valueOf(0));
testObjectSerialization(Long.valueOf(1));
testObjectSerialization(Long.valueOf(-1));
testObjectSerialization(Long.valueOf(Long.MIN_VALUE));
testObjectSerialization(Long.valueOf(Long.MAX_VALUE));
-
+
testObjectSerialization(Float.valueOf(0));
testObjectSerialization(Float.valueOf(1));
testObjectSerialization(Float.valueOf(-1));
- testObjectSerialization(Float.valueOf((float)Math.E));
- testObjectSerialization(Float.valueOf((float)Math.PI));
+ testObjectSerialization(Float.valueOf((float) Math.E));
+ testObjectSerialization(Float.valueOf((float) Math.PI));
testObjectSerialization(Float.valueOf(Float.MIN_VALUE));
testObjectSerialization(Float.valueOf(Float.MAX_VALUE));
testObjectSerialization(Float.valueOf(Float.MIN_NORMAL));
testObjectSerialization(Float.valueOf(Float.NaN));
testObjectSerialization(Float.valueOf(Float.NEGATIVE_INFINITY));
testObjectSerialization(Float.valueOf(Float.POSITIVE_INFINITY));
-
+
testObjectSerialization(Double.valueOf(0));
testObjectSerialization(Double.valueOf(1));
testObjectSerialization(Double.valueOf(-1));
@@ -138,15 +141,15 @@ public class EncoderDecoderTest {
testObjectSerialization(Double.valueOf(Double.NaN));
testObjectSerialization(Double.valueOf(Double.NEGATIVE_INFINITY));
testObjectSerialization(Double.valueOf(Double.POSITIVE_INFINITY));
-
+
testObjectSerialization("");
testObjectSerialization("abcdefg");
testObjectSerialization("ab\u1535\u0155xyz\u706F");
-
+
testObjectSerialization(new SimpleTypes(3637, 54876486548L, (byte) 65, "We're out looking for astronauts", (short) 0x2387, 2.65767523));
testObjectSerialization(new SimpleTypes(705608724, -1L, (byte) -65, "Serve me the sky with a big slice of lemon", (short) Byte.MIN_VALUE, 0.0000001));
}
-
+
@Test
public void testArrayTypes() {
{
@@ -170,7 +173,7 @@ public class EncoderDecoderTest {
testObjectSerialization(array);
}
}
-
+
@Test
public void testEmptyArray() {
{
@@ -194,14 +197,14 @@ public class EncoderDecoderTest {
testObjectSerialization(array);
}
}
-
+
@Test
public void testObjects() {
// simple object containing only primitives
{
testObjectSerialization(new Book(976243875L, "The Serialization Odysse", 42));
}
-
+
// object with collection
{
ArrayList<String> list = new ArrayList<String>();
@@ -210,22 +213,22 @@ public class EncoderDecoderTest {
list.add("C");
list.add("D");
list.add("E");
-
+
testObjectSerialization(new BookAuthor(976243875L, list, "Arno Nym"));
}
-
+
// object with empty collection
{
ArrayList<String> list = new ArrayList<String>();
testObjectSerialization(new BookAuthor(987654321L, list, "The Saurus"));
}
}
-
+
@Test
public void testNestedObjectsWithCollections() {
testObjectSerialization(new ComplexNestedObject2(true));
}
-
+
@Test
public void testGeneratedObjectWithNullableFields() {
List<CharSequence> strings = Arrays.asList(new CharSequence[] { "These", "strings", "should", "be", "recognizable", "as", "a", "meaningful", "sequence" });
@@ -243,33 +246,33 @@ public class EncoderDecoderTest {
User user = new User("Freudenreich", 1337, "macintosh gray",
1234567890L, 3.1415926, null, true, strings, bools, null,
Colors.GREEN, map, f, new Boolean(true), addr);
-
+
testObjectSerialization(user);
}
-
+
@Test
public void testVarLenCountEncoding() {
try {
long[] values = new long[] { 0, 1, 2, 3, 4, 0, 574, 45236, 0, 234623462, 23462462346L, 0, 9734028767869761L, 0x7fffffffffffffffL};
-
+
// write
ByteArrayOutputStream baos = new ByteArrayOutputStream(512);
{
DataOutputStream dataOut = new DataOutputStream(baos);
-
+
for (long val : values) {
DataOutputEncoder.writeVarLongCount(dataOut, val);
}
-
+
dataOut.flush();
dataOut.close();
}
-
+
// read
{
ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
DataInputStream dataIn = new DataInputStream(bais);
-
+
for (long val : values) {
long read = DataInputDecoder.readVarLongCount(dataIn);
assertEquals("Wrong var-len encoded value read.", val, read);
@@ -282,30 +285,30 @@ public class EncoderDecoderTest {
fail("Test failed due to an exception: " + e.getMessage());
}
}
-
+
private static <X> void testObjectSerialization(X obj) {
-
+
try {
-
+
// serialize
ByteArrayOutputStream baos = new ByteArrayOutputStream(512);
{
DataOutputStream dataOut = new DataOutputStream(baos);
DataOutputEncoder encoder = new DataOutputEncoder();
encoder.setOut(dataOut);
-
+
@SuppressWarnings("unchecked")
Class<X> clazz = (Class<X>) obj.getClass();
ReflectDatumWriter<X> writer = new ReflectDatumWriter<X>(clazz);
-
+
writer.write(obj, encoder);
dataOut.flush();
dataOut.close();
}
-
+
byte[] data = baos.toByteArray();
X result = null;
-
+
// deserialize
{
ByteArrayInputStream bais = new ByteArrayInputStream(data);
@@ -316,21 +319,21 @@ public class EncoderDecoderTest {
@SuppressWarnings("unchecked")
Class<X> clazz = (Class<X>) obj.getClass();
ReflectDatumReader<X> reader = new ReflectDatumReader<X>(clazz);
-
- // create a reuse object if possible, otherwise we have no reuse object
+
+ // create a reuse object if possible, otherwise we have no reuse object
X reuse = null;
try {
@SuppressWarnings("unchecked")
X test = (X) obj.getClass().newInstance();
reuse = test;
} catch (Throwable t) {}
-
+
result = reader.read(reuse, decoder);
}
-
+
// check
final String message = "Deserialized object is not the same as the original";
-
+
if (obj.getClass().isArray()) {
Class<?> clazz = obj.getClass();
if (clazz == byte[].class) {
@@ -366,26 +369,24 @@ public class EncoderDecoderTest {
fail("Test failed due to an exception: " + e.getMessage());
}
}
-
+
// --------------------------------------------------------------------------------------------
// Test Objects
// --------------------------------------------------------------------------------------------
+ private static final class SimpleTypes {
- public static final class SimpleTypes {
-
private final int iVal;
private final long lVal;
private final byte bVal;
private final String sVal;
private final short rVal;
private final double dVal;
-
-
+
public SimpleTypes() {
this(0, 0, (byte) 0, "", (short) 0, 0);
}
-
+
public SimpleTypes(int iVal, long lVal, byte bVal, String sVal, short rVal, double dVal) {
this.iVal = iVal;
this.lVal = lVal;
@@ -394,36 +395,36 @@ public class EncoderDecoderTest {
this.rVal = rVal;
this.dVal = dVal;
}
-
+
@Override
public boolean equals(Object obj) {
if (obj.getClass() == SimpleTypes.class) {
SimpleTypes other = (SimpleTypes) obj;
-
+
return other.iVal == this.iVal &&
other.lVal == this.lVal &&
other.bVal == this.bVal &&
other.sVal.equals(this.sVal) &&
other.rVal == this.rVal &&
other.dVal == this.dVal;
-
+
} else {
return false;
}
}
}
-
- public static class ComplexNestedObject1 {
-
+
+ private static class ComplexNestedObject1 {
+
private double doubleValue;
-
+
private List<String> stringList;
-
+
public ComplexNestedObject1() {}
-
+
public ComplexNestedObject1(int offInit) {
this.doubleValue = 6293485.6723 + offInit;
-
+
this.stringList = new ArrayList<String>();
this.stringList.add("A" + offInit);
this.stringList.add("somewhat" + offInit);
@@ -432,7 +433,7 @@ public class EncoderDecoderTest {
this.stringList.add("of" + offInit);
this.stringList.add("strings" + offInit);
}
-
+
@Override
public boolean equals(Object obj) {
if (obj.getClass() == ComplexNestedObject1.class) {
@@ -443,18 +444,18 @@ public class EncoderDecoderTest {
}
}
}
-
- public static class ComplexNestedObject2 {
-
+
+ private static class ComplexNestedObject2 {
+
private long longValue;
-
+
private Map<String, ComplexNestedObject1> theMap;
-
+
public ComplexNestedObject2() {}
-
+
public ComplexNestedObject2(boolean init) {
this.longValue = 46547;
-
+
this.theMap = new HashMap<String, ComplexNestedObject1>();
this.theMap.put("36354L", new ComplexNestedObject1(43546543));
this.theMap.put("785611L", new ComplexNestedObject1(45784568));
@@ -463,7 +464,7 @@ public class EncoderDecoderTest {
this.theMap.put("1919876876896L", new ComplexNestedObject1(27154));
this.theMap.put("-868468468L", new ComplexNestedObject1(546435));
}
-
+
@Override
public boolean equals(Object obj) {
if (obj.getClass() == ComplexNestedObject2.class) {
@@ -474,8 +475,8 @@ public class EncoderDecoderTest {
}
}
}
-
- public static class Book {
+
+ private static class Book {
private long bookId;
private String title;
@@ -488,7 +489,7 @@ public class EncoderDecoderTest {
this.title = title;
this.authorId = authorId;
}
-
+
@Override
public boolean equals(Object obj) {
if (obj.getClass() == Book.class) {
@@ -500,7 +501,7 @@ public class EncoderDecoderTest {
}
}
- public static class BookAuthor {
+ private static class BookAuthor {
private long authorId;
private List<String> bookTitles;
@@ -513,7 +514,7 @@ public class EncoderDecoderTest {
this.bookTitles = bookTitles;
this.authorName = authorName;
}
-
+
@Override
public boolean equals(Object obj) {
if (obj.getClass() == BookAuthor.class) {
http://git-wip-us.apache.org/repos/asf/flink/blob/b58545ec/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java
index 1174786..a8541b6 100644
--- a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java
+++ b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java
@@ -16,18 +16,21 @@
* limitations under the License.
*/
-
package org.apache.flink.api.avro.testjar;
-// ================================================================================================
-// This file defines the classes for the AvroExternalJarProgramITCase.
-// The program is exported into src/test/resources/AvroTestProgram.jar.
-//
-// THIS FILE MUST STAY FULLY COMMENTED SUCH THAT THE HERE DEFINED CLASSES ARE NOT COMPILED
-// AND ADDED TO THE test-classes DIRECTORY. OTHERWISE, THE EXTERNAL CLASS LOADING WILL
-// NOT BE COVERED BY THIS TEST.
-// ================================================================================================
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.functions.RichReduceFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.AvroInputFormat;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.fs.Path;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.reflect.ReflectDatumWriter;
import java.io.File;
import java.io.IOException;
@@ -35,100 +38,90 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Random;
-import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.io.DatumWriter;
-import org.apache.avro.reflect.ReflectData;
-import org.apache.avro.reflect.ReflectDatumWriter;
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.common.functions.RichReduceFunction;
-import org.apache.flink.api.java.io.DiscardingOutputFormat;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.AvroInputFormat;
-import org.apache.flink.core.fs.Path;
-
+/**
+ * This file defines the classes for the AvroExternalJarProgramITCase.
+ */
public class AvroExternalJarProgram {
- public static final class Color {
-
+ private static final class Color {
+
private String name;
private double saturation;
-
+
public Color() {
name = "";
saturation = 1.0;
}
-
+
public Color(String name, double saturation) {
this.name = name;
this.saturation = saturation;
}
-
+
public String getName() {
return name;
}
-
+
public void setName(String name) {
this.name = name;
}
-
+
public double getSaturation() {
return saturation;
}
-
+
public void setSaturation(double saturation) {
this.saturation = saturation;
}
-
+
@Override
public String toString() {
return name + '(' + saturation + ')';
}
}
-
- public static final class MyUser {
-
+
+ private static final class MyUser {
+
private String name;
private List<Color> colors;
-
+
public MyUser() {
name = "unknown";
colors = new ArrayList<Color>();
}
-
+
public MyUser(String name, List<Color> colors) {
this.name = name;
this.colors = colors;
}
-
+
public String getName() {
return name;
}
-
+
public List<Color> getColors() {
return colors;
}
-
+
public void setName(String name) {
this.name = name;
}
-
+
public void setColors(List<Color> colors) {
this.colors = colors;
}
-
+
@Override
public String toString() {
return name + " : " + colors;
}
}
-
+
// --------------------------------------------------------------------------------------------
-
+
// --------------------------------------------------------------------------------------------
-
- public static final class NameExtractor extends RichMapFunction<MyUser, Tuple2<String, MyUser>> {
+
+ private static final class NameExtractor extends RichMapFunction<MyUser, Tuple2<String, MyUser>> {
private static final long serialVersionUID = 1L;
@Override
@@ -137,8 +130,8 @@ public class AvroExternalJarProgram {
return new Tuple2<String, MyUser>(namePrefix, u);
}
}
-
- public static final class NameGrouper extends RichReduceFunction<Tuple2<String, MyUser>> {
+
+ private static final class NameGrouper extends RichReduceFunction<Tuple2<String, MyUser>> {
private static final long serialVersionUID = 1L;
@Override
@@ -150,52 +143,51 @@ public class AvroExternalJarProgram {
// --------------------------------------------------------------------------------------------
// Test Data
// --------------------------------------------------------------------------------------------
-
- public static final class Generator {
-
+
+ private static final class Generator {
+
private final Random rnd = new Random(2389756789345689276L);
-
+
public MyUser nextUser() {
return randomUser();
}
-
+
private MyUser randomUser() {
-
+
int numColors = rnd.nextInt(5);
ArrayList<Color> colors = new ArrayList<Color>(numColors);
for (int i = 0; i < numColors; i++) {
colors.add(new Color(randomString(), rnd.nextDouble()));
}
-
+
return new MyUser(randomString(), colors);
}
-
+
private String randomString() {
char[] c = new char[this.rnd.nextInt(20) + 5];
-
+
for (int i = 0; i < c.length; i++) {
c[i] = (char) (this.rnd.nextInt(150) + 40);
}
-
+
return new String(c);
}
}
-
+
public static void writeTestData(File testFile, int numRecords) throws IOException {
-
+
DatumWriter<MyUser> userDatumWriter = new ReflectDatumWriter<MyUser>(MyUser.class);
DataFileWriter<MyUser> dataFileWriter = new DataFileWriter<MyUser>(userDatumWriter);
-
+
dataFileWriter.create(ReflectData.get().getSchema(MyUser.class), testFile);
-
-
+
Generator generator = new Generator();
-
+
for (int i = 0; i < numRecords; i++) {
MyUser user = generator.nextUser();
dataFileWriter.append(user);
}
-
+
dataFileWriter.close();
}
@@ -203,17 +195,17 @@ public class AvroExternalJarProgram {
// String testDataFile = new File("src/test/resources/testdata.avro").getAbsolutePath();
// writeTestData(new File(testDataFile), 50);
// }
-
+
public static void main(String[] args) throws Exception {
String inputPath = args[0];
-
+
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
+
DataSet<MyUser> input = env.createInput(new AvroInputFormat<MyUser>(new Path(inputPath), MyUser.class));
-
+
DataSet<Tuple2<String, MyUser>> result = input.map(new NameExtractor()).groupBy(0).reduce(new NameGrouper());
-
- result.output(new DiscardingOutputFormat<Tuple2<String,MyUser>>());
+
+ result.output(new DiscardingOutputFormat<Tuple2<String, MyUser>>());
env.execute();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b58545ec/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java
index f33f433..be968c5 100644
--- a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java
+++ b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.api.io.avro;
import org.apache.flink.api.common.ExecutionConfig;
@@ -29,6 +30,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.Path;
import org.apache.flink.test.util.MultipleProgramsTestBase;
import org.apache.flink.util.Collector;
+
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -43,6 +45,9 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
+/**
+ * Tests for the {@link AvroInputFormat} reading Pojos.
+ */
@RunWith(Parameterized.class)
public class AvroPojoTest extends MultipleProgramsTestBase {
public AvroPojoTest(TestExecutionMode mode) {
@@ -88,7 +93,6 @@ public class AvroPojoTest extends MultipleProgramsTestBase {
env.execute("Simple Avro read job");
-
expected = "{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null, \"type_long_test\": null, \"type_double_test\": 123.45, \"type_null_test\": null, \"type_bool_test\": true, \"type_array_string\": [\"ELEMENT 1\", \"ELEMENT 2\"], \"type_array_boolean\": [true, false], \"type_nullable_array\": null, \"type_enum\": \"GREEN\", \"type_map\": null, \"type_fixed\": null, \"type_union\": null, \"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", \"city\": \"London\", \"state\": \"London\", \"zip\": \"NW1 6XE\"}}\n" +
"{\"name\": \"Charlie\", \"favorite_number\": null, \"favorite_color\": \"blue\", \"type_long_test\": 1337, \"type_double_test\": 1.337, \"type_null_test\": null, \"type_bool_test\": false, \"type_array_string\": [], \"type_array_boolean\": [], \"type_nullable_array\": null, \"type_enum\": \"RED\", \"type_map\": null, \"type_fixed\": null, \"type_union\": null, \"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", \"city\": \"London\", \"state\": \"London\", \"zip\": \"NW1 6XE\"}}\n";
}
@@ -116,7 +120,6 @@ public class AvroPojoTest extends MultipleProgramsTestBase {
env.execute("Simple Avro read job");
-
expected = "{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null, \"type_long_test\": null, \"type_double_test\": 123.45, \"type_null_test\": null, \"type_bool_test\": true, \"type_array_string\": [\"ELEMENT 1\", \"ELEMENT 2\"], \"type_array_boolean\": [true, false], \"type_nullable_array\": null, \"type_enum\": \"GREEN\", \"type_map\": {\"hehe\": 12}, \"type_fixed\": null, \"type_union\": null, \"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", \"city\": \"London\", \"state\": \"London\", \"zip\": \"NW1 6XE\"}}\n" +
"{\"name\": \"Charlie\", \"favorite_number\": null, \"favorite_color\": \"blue\", \"type_long_test\": 1337, \"type_double_test\": 1.337, \"type_null_test\": null, \"type_bool_test\": false, \"type_array_string\": [], \"type_array_boolean\": [], \"type_nullable_array\": null, \"type_enum\": \"RED\", \"type_map\": {\"hehe\": 12}, \"type_fixed\": null, \"type_union\": null, \"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", \"city\": \"London\", \"state\": \"London\", \"zip\": \"NW1 6XE\"}}\n";
@@ -142,7 +145,6 @@ public class AvroPojoTest extends MultipleProgramsTestBase {
res.writeAsText(resultPath);
env.execute("Avro Key selection");
-
expected = "(Alyssa,1)\n(Charlie,1)\n";
}
@@ -163,7 +165,7 @@ public class AvroPojoTest extends MultipleProgramsTestBase {
}).reduceGroup(new GroupReduceFunction<User, Tuple2<String, Integer>>() {
@Override
public void reduce(Iterable<User> values, Collector<Tuple2<String, Integer>> out) throws Exception {
- for(User u : values) {
+ for (User u : values) {
out.collect(new Tuple2<String, Integer>(u.getName().toString(), 1));
}
}
@@ -172,7 +174,6 @@ public class AvroPojoTest extends MultipleProgramsTestBase {
res.writeAsText(resultPath);
env.execute("Avro Key selection");
-
expected = "(Charlie,1)\n(Alyssa,1)\n";
}
@@ -202,16 +203,15 @@ public class AvroPojoTest extends MultipleProgramsTestBase {
res.writeAsText(resultPath);
env.execute("Avro Key selection");
-
expected = "(Charlie,1)\n(Alyssa,1)\n";
}
/**
- * Test some know fields for grouping on
+ * Test some know fields for grouping on.
*/
@Test
public void testAllFields() throws Exception {
- for(String fieldName : Arrays.asList("name", "type_enum", "type_double_test")) {
+ for (String fieldName : Arrays.asList("name", "type_enum", "type_double_test")) {
testField(fieldName);
}
}
@@ -228,7 +228,7 @@ public class AvroPojoTest extends MultipleProgramsTestBase {
DataSet<Object> res = usersDS.groupBy(fieldName).reduceGroup(new GroupReduceFunction<User, Object>() {
@Override
public void reduce(Iterable<User> values, Collector<Object> out) throws Exception {
- for(User u : values) {
+ for (User u : values) {
out.collect(u.get(fieldName));
}
}
@@ -240,11 +240,11 @@ public class AvroPojoTest extends MultipleProgramsTestBase {
ExecutionConfig ec = env.getConfig();
Assert.assertTrue(ec.getRegisteredKryoTypes().contains(org.apache.flink.api.io.avro.generated.Fixed16.class));
- if(fieldName.equals("name")) {
+ if (fieldName.equals("name")) {
expected = "Alyssa\nCharlie";
- } else if(fieldName.equals("type_enum")) {
+ } else if (fieldName.equals("type_enum")) {
expected = "GREEN\nRED\n";
- } else if(fieldName.equals("type_double_test")) {
+ } else if (fieldName.equals("type_double_test")) {
expected = "123.45\n1.337\n";
} else {
Assert.fail("Unknown field");
http://git-wip-us.apache.org/repos/asf/flink/blob/b58545ec/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java
index 3b6ad63..7bff28a 100644
--- a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java
+++ b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java
@@ -18,18 +18,6 @@
package org.apache.flink.api.io.avro;
-import org.apache.avro.Schema;
-import org.apache.avro.file.DataFileReader;
-import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.file.FileReader;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericDatumReader;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.io.DatumReader;
-import org.apache.avro.io.DatumWriter;
-import org.apache.avro.specific.SpecificDatumReader;
-import org.apache.avro.specific.SpecificDatumWriter;
-import org.apache.avro.util.Utf8;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -46,6 +34,19 @@ import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.file.FileReader;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.avro.util.Utf8;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -73,33 +74,31 @@ import static org.junit.Assert.assertTrue;
* http://avro.apache.org/docs/current/gettingstartedjava.html
*/
public class AvroRecordInputFormatTest {
-
+
public File testFile;
-
- final static String TEST_NAME = "Alyssa";
-
- final static String TEST_ARRAY_STRING_1 = "ELEMENT 1";
- final static String TEST_ARRAY_STRING_2 = "ELEMENT 2";
-
- final static boolean TEST_ARRAY_BOOLEAN_1 = true;
- final static boolean TEST_ARRAY_BOOLEAN_2 = false;
-
- final static Colors TEST_ENUM_COLOR = Colors.GREEN;
-
- final static String TEST_MAP_KEY1 = "KEY 1";
- final static long TEST_MAP_VALUE1 = 8546456L;
- final static String TEST_MAP_KEY2 = "KEY 2";
- final static long TEST_MAP_VALUE2 = 17554L;
-
- final static int TEST_NUM = 239;
- final static String TEST_STREET = "Baker Street";
- final static String TEST_CITY = "London";
- final static String TEST_STATE = "London";
- final static String TEST_ZIP = "NW1 6XE";
-
- private Schema userSchema = new User().getSchema();
+ static final String TEST_NAME = "Alyssa";
+
+ static final String TEST_ARRAY_STRING_1 = "ELEMENT 1";
+ static final String TEST_ARRAY_STRING_2 = "ELEMENT 2";
+
+ static final boolean TEST_ARRAY_BOOLEAN_1 = true;
+ static final boolean TEST_ARRAY_BOOLEAN_2 = false;
+
+ static final Colors TEST_ENUM_COLOR = Colors.GREEN;
+ static final String TEST_MAP_KEY1 = "KEY 1";
+ static final long TEST_MAP_VALUE1 = 8546456L;
+ static final String TEST_MAP_KEY2 = "KEY 2";
+ static final long TEST_MAP_VALUE2 = 17554L;
+
+ static final int TEST_NUM = 239;
+ static final String TEST_STREET = "Baker Street";
+ static final String TEST_CITY = "London";
+ static final String TEST_STATE = "London";
+ static final String TEST_ZIP = "NW1 6XE";
+
+ private Schema userSchema = new User().getSchema();
public static void writeTestFile(File testFile) throws IOException {
ArrayList<CharSequence> stringArray = new ArrayList<CharSequence>();
@@ -113,7 +112,7 @@ public class AvroRecordInputFormatTest {
HashMap<CharSequence, Long> longMap = new HashMap<CharSequence, Long>();
longMap.put(TEST_MAP_KEY1, TEST_MAP_VALUE1);
longMap.put(TEST_MAP_KEY2, TEST_MAP_VALUE2);
-
+
Address addr = new Address();
addr.setNum(TEST_NUM);
addr.setStreet(TEST_STREET);
@@ -121,7 +120,6 @@ public class AvroRecordInputFormatTest {
addr.setState(TEST_STATE);
addr.setZip(TEST_ZIP);
-
User user1 = new User();
user1.setName(TEST_NAME);
@@ -162,13 +160,13 @@ public class AvroRecordInputFormatTest {
dataFileWriter.append(user2);
dataFileWriter.close();
}
+
@Before
public void createFiles() throws IOException {
testFile = File.createTempFile("AvroInputFormatTest", null);
writeTestFile(testFile);
}
-
/**
* Test if the AvroInputFormat is able to properly read data from an avro file.
* @throws IOException
@@ -176,45 +174,45 @@ public class AvroRecordInputFormatTest {
@Test
public void testDeserialisation() throws IOException {
Configuration parameters = new Configuration();
-
+
AvroInputFormat<User> format = new AvroInputFormat<User>(new Path(testFile.getAbsolutePath()), User.class);
-
+
format.configure(parameters);
FileInputSplit[] splits = format.createInputSplits(1);
assertEquals(splits.length, 1);
format.open(splits[0]);
-
+
User u = format.nextRecord(null);
assertNotNull(u);
-
+
String name = u.getName().toString();
assertNotNull("empty record", name);
assertEquals("name not equal", TEST_NAME, name);
-
+
// check arrays
List<CharSequence> sl = u.getTypeArrayString();
assertEquals("element 0 not equal", TEST_ARRAY_STRING_1, sl.get(0).toString());
assertEquals("element 1 not equal", TEST_ARRAY_STRING_2, sl.get(1).toString());
-
+
List<Boolean> bl = u.getTypeArrayBoolean();
assertEquals("element 0 not equal", TEST_ARRAY_BOOLEAN_1, bl.get(0));
assertEquals("element 1 not equal", TEST_ARRAY_BOOLEAN_2, bl.get(1));
-
+
// check enums
Colors enumValue = u.getTypeEnum();
assertEquals("enum not equal", TEST_ENUM_COLOR, enumValue);
-
+
// check maps
Map<CharSequence, Long> lm = u.getTypeMap();
assertEquals("map value of key 1 not equal", TEST_MAP_VALUE1, lm.get(new Utf8(TEST_MAP_KEY1)).longValue());
assertEquals("map value of key 2 not equal", TEST_MAP_VALUE2, lm.get(new Utf8(TEST_MAP_KEY2)).longValue());
-
+
assertFalse("expecting second element", format.reachedEnd());
assertNotNull("expecting second element", format.nextRecord(u));
-
+
assertNull(format.nextRecord(u));
assertTrue(format.reachedEnd());
-
+
format.close();
}
@@ -225,46 +223,46 @@ public class AvroRecordInputFormatTest {
@Test
public void testDeserialisationReuseAvroRecordFalse() throws IOException {
Configuration parameters = new Configuration();
-
+
AvroInputFormat<User> format = new AvroInputFormat<User>(new Path(testFile.getAbsolutePath()), User.class);
format.setReuseAvroValue(false);
-
+
format.configure(parameters);
FileInputSplit[] splits = format.createInputSplits(1);
assertEquals(splits.length, 1);
format.open(splits[0]);
-
+
User u = format.nextRecord(null);
assertNotNull(u);
-
+
String name = u.getName().toString();
assertNotNull("empty record", name);
assertEquals("name not equal", TEST_NAME, name);
-
+
// check arrays
List<CharSequence> sl = u.getTypeArrayString();
assertEquals("element 0 not equal", TEST_ARRAY_STRING_1, sl.get(0).toString());
assertEquals("element 1 not equal", TEST_ARRAY_STRING_2, sl.get(1).toString());
-
+
List<Boolean> bl = u.getTypeArrayBoolean();
assertEquals("element 0 not equal", TEST_ARRAY_BOOLEAN_1, bl.get(0));
assertEquals("element 1 not equal", TEST_ARRAY_BOOLEAN_2, bl.get(1));
-
+
// check enums
Colors enumValue = u.getTypeEnum();
assertEquals("enum not equal", TEST_ENUM_COLOR, enumValue);
-
+
// check maps
Map<CharSequence, Long> lm = u.getTypeMap();
assertEquals("map value of key 1 not equal", TEST_MAP_VALUE1, lm.get(new Utf8(TEST_MAP_KEY1)).longValue());
assertEquals("map value of key 2 not equal", TEST_MAP_VALUE2, lm.get(new Utf8(TEST_MAP_KEY2)).longValue());
-
+
assertFalse("expecting second element", format.reachedEnd());
assertNotNull("expecting second element", format.nextRecord(u));
-
+
assertNull(format.nextRecord(u));
assertTrue(format.reachedEnd());
-
+
format.close();
}
@@ -274,7 +272,7 @@ public class AvroRecordInputFormatTest {
* However, if generated classes are not available, one can also use GenericData.Record.
* It is an untyped key-value record which is using a schema to validate the correctness of the data.
*
- * It is not recommended to use GenericData.Record with Flink. Use generated POJOs instead.
+ * <p>It is not recommended to use GenericData.Record with Flink. Use generated POJOs instead.
*/
@Test
public void testDeserializeToGenericType() throws IOException {
@@ -284,7 +282,7 @@ public class AvroRecordInputFormatTest {
// initialize Record by reading it from disk (that's easier than creating it by hand)
GenericData.Record rec = new GenericData.Record(userSchema);
dataFileReader.next(rec);
-
+
// check if record has been read correctly
assertNotNull(rec);
assertEquals("name not equal", TEST_NAME, rec.get("name").toString());
@@ -296,7 +294,7 @@ public class AvroRecordInputFormatTest {
ExecutionConfig ec = new ExecutionConfig();
Assert.assertEquals(GenericTypeInfo.class, te.getClass());
-
+
Serializers.recursivelyRegisterType(te.getTypeClass(), ec, new HashSet<Class<?>>());
TypeSerializer<GenericData.Record> tser = te.createSerializer(ec);
@@ -312,8 +310,7 @@ public class AvroRecordInputFormatTest {
GenericData.Record newRec;
try (DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(
- new ByteArrayInputStream(out.toByteArray())))
- {
+ new ByteArrayInputStream(out.toByteArray()))) {
newRec = tser.deserialize(inView);
}
@@ -324,7 +321,7 @@ public class AvroRecordInputFormatTest {
assertEquals(null, newRec.get("type_long_test"));
}
}
-
+
/**
* This test validates proper serialization with specific (generated POJO) types.
*/
@@ -355,8 +352,7 @@ public class AvroRecordInputFormatTest {
User newRec;
try (DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(
- new ByteArrayInputStream(out.toByteArray())))
- {
+ new ByteArrayInputStream(out.toByteArray()))) {
newRec = tser.deserialize(inView);
}
@@ -370,9 +366,8 @@ public class AvroRecordInputFormatTest {
/**
* Test if the AvroInputFormat is able to properly read data from an Avro
* file as a GenericRecord.
- *
- * @throws IOException,
- * if there is an exception
+ *
+ * @throws IOException
*/
@Test
public void testDeserialisationGenericRecord() throws IOException {
@@ -385,8 +380,8 @@ public class AvroRecordInputFormatTest {
}
/**
- * Helper method to test GenericRecord serialisation
- *
+ * Helper method to test GenericRecord serialisation.
+ *
* @param format
* the format to test
* @param parameters
@@ -441,10 +436,9 @@ public class AvroRecordInputFormatTest {
/**
* Test if the AvroInputFormat is able to properly read data from an avro
- * file as a GenericRecord
- *
- * @throws IOException,
- * if there is an error
+ * file as a GenericRecord.
+ *
+ * @throws IOException if there is an error
*/
@Test
public void testDeserialisationGenericRecordReuseAvroValueFalse() throws IOException {
http://git-wip-us.apache.org/repos/asf/flink/blob/b58545ec/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroSplittableInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroSplittableInputFormatTest.java b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroSplittableInputFormatTest.java
index 37a83d1..6401a87 100644
--- a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroSplittableInputFormatTest.java
+++ b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroSplittableInputFormatTest.java
@@ -18,9 +18,6 @@
package org.apache.flink.api.io.avro;
-import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.io.DatumWriter;
-import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.flink.api.io.avro.generated.Address;
import org.apache.flink.api.io.avro.generated.Colors;
import org.apache.flink.api.io.avro.generated.Fixed16;
@@ -30,6 +27,10 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.fs.Path;
+
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.specific.SpecificDatumWriter;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -49,56 +50,55 @@ import static org.junit.Assert.assertEquals;
* http://avro.apache.org/docs/current/gettingstartedjava.html
*/
public class AvroSplittableInputFormatTest {
-
+
private File testFile;
-
- final static String TEST_NAME = "Alyssa";
-
- final static String TEST_ARRAY_STRING_1 = "ELEMENT 1";
- final static String TEST_ARRAY_STRING_2 = "ELEMENT 2";
-
- final static boolean TEST_ARRAY_BOOLEAN_1 = true;
- final static boolean TEST_ARRAY_BOOLEAN_2 = false;
-
- final static Colors TEST_ENUM_COLOR = Colors.GREEN;
-
- final static String TEST_MAP_KEY1 = "KEY 1";
- final static long TEST_MAP_VALUE1 = 8546456L;
- final static String TEST_MAP_KEY2 = "KEY 2";
- final static long TEST_MAP_VALUE2 = 17554L;
-
- final static Integer TEST_NUM = new Integer(239);
- final static String TEST_STREET = "Baker Street";
- final static String TEST_CITY = "London";
- final static String TEST_STATE = "London";
- final static String TEST_ZIP = "NW1 6XE";
-
- final static int NUM_RECORDS = 5000;
+
+ static final String TEST_NAME = "Alyssa";
+
+ static final String TEST_ARRAY_STRING_1 = "ELEMENT 1";
+ static final String TEST_ARRAY_STRING_2 = "ELEMENT 2";
+
+ static final boolean TEST_ARRAY_BOOLEAN_1 = true;
+ static final boolean TEST_ARRAY_BOOLEAN_2 = false;
+
+ static final Colors TEST_ENUM_COLOR = Colors.GREEN;
+
+ static final String TEST_MAP_KEY1 = "KEY 1";
+ static final long TEST_MAP_VALUE1 = 8546456L;
+ static final String TEST_MAP_KEY2 = "KEY 2";
+ static final long TEST_MAP_VALUE2 = 17554L;
+
+ static final Integer TEST_NUM = new Integer(239);
+ static final String TEST_STREET = "Baker Street";
+ static final String TEST_CITY = "London";
+ static final String TEST_STATE = "London";
+ static final String TEST_ZIP = "NW1 6XE";
+
+ static final int NUM_RECORDS = 5000;
@Before
public void createFiles() throws IOException {
testFile = File.createTempFile("AvroSplittableInputFormatTest", null);
-
+
ArrayList<CharSequence> stringArray = new ArrayList<CharSequence>();
stringArray.add(TEST_ARRAY_STRING_1);
stringArray.add(TEST_ARRAY_STRING_2);
-
+
ArrayList<Boolean> booleanArray = new ArrayList<Boolean>();
booleanArray.add(TEST_ARRAY_BOOLEAN_1);
booleanArray.add(TEST_ARRAY_BOOLEAN_2);
-
+
HashMap<CharSequence, Long> longMap = new HashMap<CharSequence, Long>();
longMap.put(TEST_MAP_KEY1, TEST_MAP_VALUE1);
longMap.put(TEST_MAP_KEY2, TEST_MAP_VALUE2);
-
+
Address addr = new Address();
addr.setNum(new Integer(TEST_NUM));
addr.setStreet(TEST_STREET);
addr.setCity(TEST_CITY);
addr.setState(TEST_STATE);
addr.setZip(TEST_ZIP);
-
-
+
User user1 = new User();
user1.setName(TEST_NAME);
user1.setFavoriteNumber(256);
@@ -109,29 +109,28 @@ public class AvroSplittableInputFormatTest {
user1.setTypeEnum(TEST_ENUM_COLOR);
user1.setTypeMap(longMap);
user1.setTypeNested(addr);
-
+
// Construct via builder
User user2 = User.newBuilder()
- .setName(TEST_NAME)
- .setFavoriteColor("blue")
- .setFavoriteNumber(null)
- .setTypeBoolTest(false)
- .setTypeDoubleTest(1.337d)
- .setTypeNullTest(null)
- .setTypeLongTest(1337L)
- .setTypeArrayString(new ArrayList<CharSequence>())
- .setTypeArrayBoolean(new ArrayList<Boolean>())
- .setTypeNullableArray(null)
- .setTypeEnum(Colors.RED)
- .setTypeMap(new HashMap<CharSequence, Long>())
- .setTypeFixed(new Fixed16())
- .setTypeUnion(123L)
+ .setName(TEST_NAME)
+ .setFavoriteColor("blue")
+ .setFavoriteNumber(null)
+ .setTypeBoolTest(false)
+ .setTypeDoubleTest(1.337d)
+ .setTypeNullTest(null)
+ .setTypeLongTest(1337L)
+ .setTypeArrayString(new ArrayList<CharSequence>())
+ .setTypeArrayBoolean(new ArrayList<Boolean>())
+ .setTypeNullableArray(null)
+ .setTypeEnum(Colors.RED)
+ .setTypeMap(new HashMap<CharSequence, Long>())
+ .setTypeFixed(new Fixed16())
+ .setTypeUnion(123L)
.setTypeNested(
Address.newBuilder().setNum(TEST_NUM).setStreet(TEST_STREET)
.setCity(TEST_CITY).setState(TEST_STATE).setZip(TEST_ZIP)
.build())
-
- .build();
+ .build();
DatumWriter<User> userDatumWriter = new SpecificDatumWriter<User>(User.class);
DataFileWriter<User> dataFileWriter = new DataFileWriter<User>(userDatumWriter);
dataFileWriter.create(user1.getSchema(), testFile);
@@ -139,7 +138,7 @@ public class AvroSplittableInputFormatTest {
dataFileWriter.append(user2);
Random rnd = new Random(1337);
- for(int i = 0; i < NUM_RECORDS -2 ; i++) {
+ for (int i = 0; i < NUM_RECORDS - 2; i++) {
User user = new User();
user.setName(TEST_NAME + rnd.nextInt());
user.setFavoriteNumber(rnd.nextInt());
@@ -161,21 +160,21 @@ public class AvroSplittableInputFormatTest {
}
dataFileWriter.close();
}
-
+
@Test
public void testSplittedIF() throws IOException {
Configuration parameters = new Configuration();
-
+
AvroInputFormat<User> format = new AvroInputFormat<User>(new Path(testFile.getAbsolutePath()), User.class);
format.configure(parameters);
FileInputSplit[] splits = format.createInputSplits(4);
assertEquals(splits.length, 4);
int elements = 0;
- int elementsPerSplit[] = new int[4];
- for(int i = 0; i < splits.length; i++) {
+ int[] elementsPerSplit = new int[4];
+ for (int i = 0; i < splits.length; i++) {
format.open(splits[i]);
- while(!format.reachedEnd()) {
+ while (!format.reachedEnd()) {
User u = format.nextRecord(null);
Assert.assertTrue(u.getName().toString().startsWith(TEST_NAME));
elements++;
@@ -205,15 +204,15 @@ public class AvroSplittableInputFormatTest {
assertEquals(splits.length, 4);
int elements = 0;
- int elementsPerSplit[] = new int[4];
- for(int i = 0; i < splits.length; i++) {
+ int[] elementsPerSplit = new int[4];
+ for (int i = 0; i < splits.length; i++) {
format.reopen(splits[i], format.getCurrentState());
- while(!format.reachedEnd()) {
+ while (!format.reachedEnd()) {
User u = format.nextRecord(null);
Assert.assertTrue(u.getName().toString().startsWith(TEST_NAME));
elements++;
- if(format.getRecordsReadFromBlock() == recordsUntilCheckpoint) {
+ if (format.getRecordsReadFromBlock() == recordsUntilCheckpoint) {
// do the whole checkpoint-restore procedure and see if we pick up from where we left off.
Tuple2<Long, Long> state = format.getCurrentState();
@@ -251,15 +250,15 @@ public class AvroSplittableInputFormatTest {
assertEquals(splits.length, 4);
int elements = 0;
- int elementsPerSplit[] = new int[4];
- for(int i = 0; i < splits.length; i++) {
+ int[] elementsPerSplit = new int[4];
+ for (int i = 0; i < splits.length; i++) {
format.open(splits[i]);
- while(!format.reachedEnd()) {
+ while (!format.reachedEnd()) {
User u = format.nextRecord(null);
Assert.assertTrue(u.getName().toString().startsWith(TEST_NAME));
elements++;
- if(format.getRecordsReadFromBlock() == recordsUntilCheckpoint) {
+ if (format.getRecordsReadFromBlock() == recordsUntilCheckpoint) {
// do the whole checkpoint-restore procedure and see if we pick up from where we left off.
Tuple2<Long, Long> state = format.getCurrentState();
@@ -305,12 +304,12 @@ public class AvroSplittableInputFormatTest {
int elementsPerSplit[] = new int[4];
int cnt = 0;
int i = 0;
- for(InputSplit s:sp) {
+ for (InputSplit s:sp) {
RecordReader<AvroWrapper<User>, NullWritable> r = format.getRecordReader(s, jf, new HadoopDummyReporter());
AvroWrapper<User> k = r.createKey();
NullWritable v = r.createValue();
- while(r.next(k,v)) {
+ while (r.next(k, v)) {
cnt++;
elementsPerSplit[i]++;
}
@@ -318,7 +317,7 @@ public class AvroSplittableInputFormatTest {
}
System.out.println("Status "+Arrays.toString(elementsPerSplit));
} **/
-
+
@After
public void deleteFiles() {
testFile.delete();
http://git-wip-us.apache.org/repos/asf/flink/blob/b58545ec/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/example/AvroTypeExample.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/example/AvroTypeExample.java b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/example/AvroTypeExample.java
index 5a21691..96ffb7f 100644
--- a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/example/AvroTypeExample.java
+++ b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/example/AvroTypeExample.java
@@ -18,9 +18,6 @@
package org.apache.flink.api.io.avro.example;
-import java.io.IOException;
-import java.util.Random;
-
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.io.GenericInputFormat;
@@ -29,10 +26,15 @@ import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
+import java.io.IOException;
+import java.util.Random;
+
+/**
+ * Example that shows how to use an Avro typea in a program.
+ */
@SuppressWarnings("serial")
public class AvroTypeExample {
-
-
+
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@@ -45,49 +47,45 @@ public class AvroTypeExample {
.reduceGroup(new ConcatenatingReducer())
.print();
}
-
-
- public static final class NumberExtractingMapper implements MapFunction<User, Tuple2<User, Integer>> {
-
+
+ private static final class NumberExtractingMapper implements MapFunction<User, Tuple2<User, Integer>> {
+
@Override
public Tuple2<User, Integer> map(User user) {
return new Tuple2<User, Integer>(user, user.getFavoriteNumber());
}
}
-
-
- public static final class ConcatenatingReducer implements GroupReduceFunction<Tuple2<User, Integer>, Tuple2<Integer, String>> {
+
+ private static final class ConcatenatingReducer implements GroupReduceFunction<Tuple2<User, Integer>, Tuple2<Integer, String>> {
@Override
public void reduce(Iterable<Tuple2<User, Integer>> values, Collector<Tuple2<Integer, String>> out) throws Exception {
int number = 0;
StringBuilder colors = new StringBuilder();
-
+
for (Tuple2<User, Integer> u : values) {
number = u.f1;
colors.append(u.f0.getFavoriteColor()).append(" - ");
}
-
+
colors.setLength(colors.length() - 3);
out.collect(new Tuple2<Integer, String>(number, colors.toString()));
}
}
-
-
- public static final class UserGeneratingInputFormat extends GenericInputFormat<User> {
+
+ private static final class UserGeneratingInputFormat extends GenericInputFormat<User> {
private static final long serialVersionUID = 1L;
-
+
private static final int NUM = 100;
-
+
private final Random rnd = new Random(32498562304986L);
-
+
private static final String[] NAMES = { "Peter", "Bob", "Liddy", "Alexander", "Stan" };
-
+
private static final String[] COLORS = { "mauve", "crimson", "copper", "sky", "grass" };
-
+
private int count;
-
@Override
public boolean reachedEnd() throws IOException {
@@ -97,7 +95,7 @@ public class AvroTypeExample {
@Override
public User nextRecord(User reuse) throws IOException {
count++;
-
+
User u = new User();
u.setName(NAMES[rnd.nextInt(NAMES.length)]);
u.setFavoriteColor(COLORS[rnd.nextInt(COLORS.length)]);
http://git-wip-us.apache.org/repos/asf/flink/blob/b58545ec/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroInputFormatTypeExtractionTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroInputFormatTypeExtractionTest.java b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroInputFormatTypeExtractionTest.java
index e245026..5ae88ca 100644
--- a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroInputFormatTypeExtractionTest.java
+++ b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroInputFormatTypeExtractionTest.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.api.java.io;
import org.apache.flink.api.common.io.InputFormat;
@@ -26,9 +25,13 @@ import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.typeutils.PojoTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.core.fs.Path;
+
import org.junit.Assert;
import org.junit.Test;
+/**
+ * Tests for the type extraction of the {@link AvroInputFormat}.
+ */
public class AvroInputFormatTypeExtractionTest {
@Test
@@ -42,7 +45,6 @@ public class AvroInputFormatTypeExtractionTest {
DataSet<MyAvroType> input = env.createInput(format);
TypeInformation<?> typeInfoDataSet = input.getType();
-
Assert.assertTrue(typeInfoDirect instanceof PojoTypeInfo);
Assert.assertTrue(typeInfoDataSet instanceof PojoTypeInfo);
@@ -54,6 +56,9 @@ public class AvroInputFormatTypeExtractionTest {
}
}
+ /**
+ * Test type.
+ */
public static final class MyAvroType {
public String theString;
http://git-wip-us.apache.org/repos/asf/flink/blob/b58545ec/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroOutputFormatTest.java b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroOutputFormatTest.java
index f843d3b..87334a7 100644
--- a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroOutputFormatTest.java
+++ b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroOutputFormatTest.java
@@ -18,9 +18,14 @@
package org.apache.flink.api.java.io;
-import static org.apache.flink.api.java.io.AvroOutputFormat.Codec;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import org.apache.flink.api.io.avro.example.User;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
+import org.apache.avro.Schema;
+import org.junit.Test;
+import org.mockito.internal.util.reflection.Whitebox;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
@@ -29,16 +34,12 @@ import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
-import org.apache.avro.Schema;
-import org.apache.flink.api.io.avro.example.User;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
-import org.junit.Test;
-import org.mockito.internal.util.reflection.Whitebox;
+import static org.apache.flink.api.java.io.AvroOutputFormat.Codec;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
/**
- * Tests for {@link AvroOutputFormat}
+ * Tests for {@link AvroOutputFormat}.
*/
public class AvroOutputFormatTest {
@@ -116,12 +117,12 @@ public class AvroOutputFormatTest {
@Test
public void testCompression() throws Exception {
// given
- final Path outputPath = new Path(File.createTempFile("avro-output-file","avro").getAbsolutePath());
- final AvroOutputFormat<User> outputFormat = new AvroOutputFormat<>(outputPath,User.class);
+ final Path outputPath = new Path(File.createTempFile("avro-output-file", "avro").getAbsolutePath());
+ final AvroOutputFormat<User> outputFormat = new AvroOutputFormat<>(outputPath, User.class);
outputFormat.setWriteMode(FileSystem.WriteMode.OVERWRITE);
- final Path compressedOutputPath = new Path(File.createTempFile("avro-output-file","compressed.avro").getAbsolutePath());
- final AvroOutputFormat<User> compressedOutputFormat = new AvroOutputFormat<>(compressedOutputPath,User.class);
+ final Path compressedOutputPath = new Path(File.createTempFile("avro-output-file", "compressed.avro").getAbsolutePath());
+ final AvroOutputFormat<User> compressedOutputFormat = new AvroOutputFormat<>(compressedOutputPath, User.class);
compressedOutputFormat.setWriteMode(FileSystem.WriteMode.OVERWRITE);
compressedOutputFormat.setCodec(Codec.SNAPPY);
@@ -144,9 +145,9 @@ public class AvroOutputFormatTest {
private void output(final AvroOutputFormat<User> outputFormat) throws IOException {
outputFormat.configure(new Configuration());
- outputFormat.open(1,1);
+ outputFormat.open(1, 1);
for (int i = 0; i < 100; i++) {
- outputFormat.writeRecord(new User("testUser",1,"blue"));
+ outputFormat.writeRecord(new User("testUser", 1, "blue"));
}
outputFormat.close();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b58545ec/flink-connectors/flink-avro/src/test/resources/avro/user.avsc
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-avro/src/test/resources/avro/user.avsc b/flink-connectors/flink-avro/src/test/resources/avro/user.avsc
index 02c11af..ab8adf5 100644
--- a/flink-connectors/flink-avro/src/test/resources/avro/user.avsc
+++ b/flink-connectors/flink-avro/src/test/resources/avro/user.avsc
@@ -21,8 +21,8 @@
{"name": "type_double_test", "type": "double"},
{"name": "type_null_test", "type": ["null"]},
{"name": "type_bool_test", "type": ["boolean"]},
- {"name": "type_array_string", "type" : {"type" : "array", "items" : "string"}},
- {"name": "type_array_boolean", "type" : {"type" : "array", "items" : "boolean"}},
+ {"name": "type_array_string", "type" : {"type" : "array", "items" : "string"}},
+ {"name": "type_array_boolean", "type" : {"type" : "array", "items" : "boolean"}},
{"name": "type_nullable_array", "type": ["null", {"type":"array", "items":"string"}], "default":null},
{"name": "type_enum", "type": {"type": "enum", "name": "Colors", "symbols" : ["RED", "GREEN", "BLUE"]}},
{"name": "type_map", "type": {"type": "map", "values": "long"}},