You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/05/28 06:17:50 UTC

[20/21] flink git commit: [FLINK-6711] Activate strict checkstyle for flink-avro

[FLINK-6711] Activate strict checkstyle for flink-avro


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b58545ec
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b58545ec
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b58545ec

Branch: refs/heads/master
Commit: b58545ecde76ae88be11ebdc305adbd9b132d302
Parents: 1a3a5b6
Author: zentol <ch...@apache.org>
Authored: Fri May 26 00:15:34 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Sun May 28 00:11:35 2017 +0200

----------------------------------------------------------------------
 flink-connectors/flink-avro/pom.xml             |   2 +-
 .../apache/flink/api/avro/DataInputDecoder.java |  47 +++--
 .../flink/api/avro/DataOutputEncoder.java       |  47 +++--
 .../api/avro/FSDataInputStreamWrapper.java      |  13 +-
 .../flink/api/java/io/AvroInputFormat.java      |  40 ++---
 .../flink/api/java/io/AvroOutputFormat.java     |  30 ++--
 .../src/test/assembly/test-assembly.xml         |   4 +-
 .../api/avro/AvroExternalJarProgramITCase.java  |  14 +-
 .../flink/api/avro/AvroOutputFormatITCase.java  |  39 +++--
 .../flink/api/avro/EncoderDecoderTest.java      | 173 ++++++++++---------
 .../avro/testjar/AvroExternalJarProgram.java    | 132 +++++++-------
 .../apache/flink/api/io/avro/AvroPojoTest.java  |  24 +--
 .../api/io/avro/AvroRecordInputFormatTest.java  | 150 ++++++++--------
 .../io/avro/AvroSplittableInputFormatTest.java  | 133 +++++++-------
 .../api/io/avro/example/AvroTypeExample.java    |  46 +++--
 .../io/AvroInputFormatTypeExtractionTest.java   |   9 +-
 .../flink/api/java/io/AvroOutputFormatTest.java |  35 ++--
 .../src/test/resources/avro/user.avsc           |   4 +-
 18 files changed, 469 insertions(+), 473 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b58545ec/flink-connectors/flink-avro/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-avro/pom.xml b/flink-connectors/flink-avro/pom.xml
index 5082924..d057177 100644
--- a/flink-connectors/flink-avro/pom.xml
+++ b/flink-connectors/flink-avro/pom.xml
@@ -19,7 +19,7 @@ under the License.
 -->
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 		xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-	
+
 	<modelVersion>4.0.0</modelVersion>
 
 	<parent>

http://git-wip-us.apache.org/repos/asf/flink/blob/b58545ec/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/DataInputDecoder.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/DataInputDecoder.java b/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/DataInputDecoder.java
index 59da4cb..870d66f 100644
--- a/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/DataInputDecoder.java
+++ b/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/DataInputDecoder.java
@@ -18,20 +18,22 @@
 
 package org.apache.flink.api.avro;
 
-import java.io.DataInput;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
 import org.apache.avro.io.Decoder;
 import org.apache.avro.util.Utf8;
 
+import java.io.DataInput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
 
+/**
+ * A {@link Decoder} that reads from a {@link DataInput}.
+ */
 public class DataInputDecoder extends Decoder {
-	
+
 	private final Utf8 stringDecoder = new Utf8();
-	
+
 	private DataInput in;
-	
+
 	public void setIn(DataInput in) {
 		this.in = in;
 	}
@@ -39,10 +41,9 @@ public class DataInputDecoder extends Decoder {
 	// --------------------------------------------------------------------------------------------
 	// primitives
 	// --------------------------------------------------------------------------------------------
-	
+
 	@Override
 	public void readNull() {}
-	
 
 	@Override
 	public boolean readBoolean() throws IOException {
@@ -68,12 +69,12 @@ public class DataInputDecoder extends Decoder {
 	public double readDouble() throws IOException {
 		return in.readDouble();
 	}
-	
+
 	@Override
 	public int readEnum() throws IOException {
 		return readInt();
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
 	// bytes
 	// --------------------------------------------------------------------------------------------
@@ -82,7 +83,7 @@ public class DataInputDecoder extends Decoder {
 	public void readFixed(byte[] bytes, int start, int length) throws IOException {
 		in.readFully(bytes, start, length);
 	}
-	
+
 	@Override
 	public ByteBuffer readBytes(ByteBuffer old) throws IOException {
 		int length = readInt();
@@ -97,34 +98,32 @@ public class DataInputDecoder extends Decoder {
 		result.limit(length);
 		return result;
 	}
-	
-	
+
 	@Override
 	public void skipFixed(int length) throws IOException {
 		skipBytes(length);
 	}
-	
+
 	@Override
 	public void skipBytes() throws IOException {
 		int num = readInt();
 		skipBytes(num);
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
 	// strings
 	// --------------------------------------------------------------------------------------------
-	
-	
+
 	@Override
 	public Utf8 readString(Utf8 old) throws IOException {
 		int length = readInt();
 		Utf8 result = (old != null ? old : new Utf8());
 		result.setByteLength(length);
-		
+
 		if (length > 0) {
 			in.readFully(result.getBytes(), 0, length);
 		}
-		
+
 		return result;
 	}
 
@@ -172,7 +171,7 @@ public class DataInputDecoder extends Decoder {
 	public long skipMap() throws IOException {
 		return readVarLongCount(in);
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
 	// union
 	// --------------------------------------------------------------------------------------------
@@ -181,17 +180,17 @@ public class DataInputDecoder extends Decoder {
 	public int readIndex() throws IOException {
 		return readInt();
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
 	// utils
 	// --------------------------------------------------------------------------------------------
-	
+
 	private void skipBytes(int num) throws IOException {
 		while (num > 0) {
 			num -= in.skipBytes(num);
 		}
 	}
-	
+
 	public static long readVarLongCount(DataInput in) throws IOException {
 		long value = in.readUnsignedByte();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b58545ec/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/DataOutputEncoder.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/DataOutputEncoder.java b/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/DataOutputEncoder.java
index 0102cc1..beae330 100644
--- a/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/DataOutputEncoder.java
+++ b/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/DataOutputEncoder.java
@@ -18,36 +18,35 @@
 
 package org.apache.flink.api.avro;
 
-import java.io.DataOutput;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
 import org.apache.avro.io.Encoder;
 import org.apache.avro.util.Utf8;
 
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
 
+/**
+ * An {@link Encoder} that writes data to a {@link DataOutput}.
+ */
 public final class DataOutputEncoder extends Encoder implements java.io.Serializable {
-	
+
 	private static final long serialVersionUID = 1L;
-	
+
 	private DataOutput out;
-	
-	
+
 	public void setOut(DataOutput out) {
 		this.out = out;
 	}
 
-
 	@Override
 	public void flush() throws IOException {}
 
 	// --------------------------------------------------------------------------------------------
 	// primitives
 	// --------------------------------------------------------------------------------------------
-	
+
 	@Override
 	public void writeNull() {}
-	
 
 	@Override
 	public void writeBoolean(boolean b) throws IOException {
@@ -73,13 +72,12 @@ public final class DataOutputEncoder extends Encoder implements java.io.Serializ
 	public void writeDouble(double d) throws IOException {
 		out.writeDouble(d);
 	}
-	
+
 	@Override
 	public void writeEnum(int e) throws IOException {
 		out.writeInt(e);
 	}
-	
-	
+
 	// --------------------------------------------------------------------------------------------
 	// bytes
 	// --------------------------------------------------------------------------------------------
@@ -88,7 +86,7 @@ public final class DataOutputEncoder extends Encoder implements java.io.Serializ
 	public void writeFixed(byte[] bytes, int start, int len) throws IOException {
 		out.write(bytes, start, len);
 	}
-	
+
 	@Override
 	public void writeBytes(byte[] bytes, int start, int len) throws IOException {
 		out.writeInt(len);
@@ -96,17 +94,17 @@ public final class DataOutputEncoder extends Encoder implements java.io.Serializ
 			out.write(bytes, start, len);
 		}
 	}
-	
+
 	@Override
 	public void writeBytes(ByteBuffer bytes) throws IOException {
 		int num = bytes.remaining();
 		out.writeInt(num);
-		
+
 		if (num > 0) {
 			writeFixed(bytes);
 		}
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
 	// strings
 	// --------------------------------------------------------------------------------------------
@@ -116,11 +114,11 @@ public final class DataOutputEncoder extends Encoder implements java.io.Serializ
 		byte[] bytes = Utf8.getBytesFor(str);
 		writeBytes(bytes, 0, bytes.length);
 	}
-	
+
 	@Override
 	public void writeString(Utf8 utf8) throws IOException {
 		writeBytes(utf8.getBytes(), 0, utf8.getByteLength());
-		
+
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -158,22 +156,21 @@ public final class DataOutputEncoder extends Encoder implements java.io.Serializ
 	// --------------------------------------------------------------------------------------------
 	// union
 	// --------------------------------------------------------------------------------------------
-	
+
 	@Override
 	public void writeIndex(int unionIndex) throws IOException {
 		out.writeInt(unionIndex);
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
 	// utils
 	// --------------------------------------------------------------------------------------------
-		
-	
+
 	public static void writeVarLongCount(DataOutput out, long val) throws IOException {
 		if (val < 0) {
 			throw new IOException("Illegal count (must be non-negative): " + val);
 		}
-		
+
 		while ((val & ~0x7FL) != 0) {
 			out.write(((int) val) | 0x80);
 			val >>>= 7;

http://git-wip-us.apache.org/repos/asf/flink/blob/b58545ec/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/FSDataInputStreamWrapper.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/FSDataInputStreamWrapper.java b/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/FSDataInputStreamWrapper.java
index 709c4f1..19e4a89 100644
--- a/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/FSDataInputStreamWrapper.java
+++ b/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/FSDataInputStreamWrapper.java
@@ -16,20 +16,19 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.api.avro;
 
-import java.io.Closeable;
-import java.io.IOException;
+import org.apache.flink.core.fs.FSDataInputStream;
 
 import org.apache.avro.file.SeekableInput;
-import org.apache.flink.core.fs.FSDataInputStream;
 
+import java.io.Closeable;
+import java.io.IOException;
 
 /**
- * Code copy pasted from org.apache.avro.mapred.FSInput (which is Apache licensed as well)
- * 
- * The wrapper keeps track of the position in the data stream.
+ * Code copy pasted from org.apache.avro.mapred.FSInput (which is Apache licensed as well).
+ *
+ * <p>The wrapper keeps track of the position in the data stream.
  */
 public class FSDataInputStreamWrapper implements Closeable, SeekableInput {
 	private final FSDataInputStream stream;

http://git-wip-us.apache.org/repos/asf/flink/blob/b58545ec/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java b/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java
index 73067c1..33105cc 100644
--- a/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java
+++ b/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java
@@ -16,10 +16,19 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.api.java.io;
 
-import java.io.IOException;
+import org.apache.flink.api.avro.FSDataInputStreamWrapper;
+import org.apache.flink.api.common.io.CheckpointableInputFormat;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
 
 import org.apache.avro.file.DataFileReader;
 import org.apache.avro.file.SeekableInput;
@@ -28,19 +37,10 @@ import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.reflect.ReflectDatumReader;
 import org.apache.avro.specific.SpecificDatumReader;
-import org.apache.flink.api.common.io.CheckpointableInputFormat;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.flink.api.avro.FSDataInputStreamWrapper;
-import org.apache.flink.api.common.io.FileInputFormat;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.core.fs.FileInputSplit;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
 
 /**
  * Provides a {@link FileInputFormat} for Avro records.
@@ -59,7 +59,7 @@ public class AvroInputFormat<E> extends FileInputFormat<E> implements ResultType
 	private static final Logger LOG = LoggerFactory.getLogger(AvroInputFormat.class);
 
 	private final Class<E> avroValueType;
-	
+
 	private boolean reuseAvroValue = true;
 
 	private transient DataFileReader<E> dataFileReader;
@@ -68,7 +68,7 @@ public class AvroInputFormat<E> extends FileInputFormat<E> implements ResultType
 
 	private transient long recordsReadSinceLastSync;
 
-	private long lastSync = -1l;
+	private long lastSync = -1L;
 
 	public AvroInputFormat(Path filePath, Class<E> type) {
 		super(filePath);
@@ -91,16 +91,16 @@ public class AvroInputFormat<E> extends FileInputFormat<E> implements ResultType
 	public void setUnsplittable(boolean unsplittable) {
 		this.unsplittable = unsplittable;
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
 	// Typing
 	// --------------------------------------------------------------------------------------------
-	
+
 	@Override
 	public TypeInformation<E> getProducedType() {
 		return TypeExtractor.getForClass(this.avroValueType);
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
 	// Input Format Methods
 	// --------------------------------------------------------------------------------------------
@@ -155,7 +155,7 @@ public class AvroInputFormat<E> extends FileInputFormat<E> implements ResultType
 
 		// if we start a new block, then register the event, and
 		// restart the counter.
-		if(dataFileReader.previousSync() != lastSync) {
+		if (dataFileReader.previousSync() != lastSync) {
 			lastSync = dataFileReader.previousSync();
 			recordsReadSinceLastSync = 0;
 		}
@@ -199,7 +199,7 @@ public class AvroInputFormat<E> extends FileInputFormat<E> implements ResultType
 			// open and read until the record we were before
 			// the checkpoint and discard the values
 			dataFileReader.seek(lastSync);
-			for(int i = 0; i < recordsReadSinceLastSync; i++) {
+			for (int i = 0; i < recordsReadSinceLastSync; i++) {
 				dataFileReader.next(null);
 			}
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/b58545ec/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java b/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java
index ae90362..aed40bf 100644
--- a/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java
+++ b/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java
@@ -15,8 +15,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.api.java.io;
 
+import org.apache.flink.api.common.io.FileOutputFormat;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.core.fs.Path;
+
 import org.apache.avro.Schema;
 import org.apache.avro.file.CodecFactory;
 import org.apache.avro.file.DataFileWriter;
@@ -24,15 +29,16 @@ import org.apache.avro.io.DatumWriter;
 import org.apache.avro.reflect.ReflectData;
 import org.apache.avro.reflect.ReflectDatumWriter;
 import org.apache.avro.specific.SpecificDatumWriter;
-import org.apache.flink.api.common.io.FileOutputFormat;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.core.fs.Path;
 
 import java.io.IOException;
 import java.io.Serializable;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
+/**
+ * {@link FileOutputFormat} for Avro records.
+ * @param <E>
+ */
 public class AvroOutputFormat<E> extends FileOutputFormat<E> implements Serializable {
 
 	/**
@@ -40,11 +46,11 @@ public class AvroOutputFormat<E> extends FileOutputFormat<E> implements Serializ
 	 */
 	public enum Codec {
 
-		NULL((byte)0, CodecFactory.nullCodec()),
-		SNAPPY((byte)1, CodecFactory.snappyCodec()),
-		BZIP2((byte)2, CodecFactory.bzip2Codec()),
-		DEFLATE((byte)3, CodecFactory.deflateCodec(CodecFactory.DEFAULT_DEFLATE_LEVEL)),
-		XZ((byte)4, CodecFactory.xzCodec(CodecFactory.DEFAULT_XZ_LEVEL));
+		NULL((byte) 0, CodecFactory.nullCodec()),
+		SNAPPY((byte) 1, CodecFactory.snappyCodec()),
+		BZIP2((byte) 2, CodecFactory.bzip2Codec()),
+		DEFLATE((byte) 3, CodecFactory.deflateCodec(CodecFactory.DEFAULT_DEFLATE_LEVEL)),
+		XZ((byte) 4, CodecFactory.xzCodec(CodecFactory.DEFAULT_XZ_LEVEL));
 
 		private byte codecByte;
 
@@ -80,7 +86,7 @@ public class AvroOutputFormat<E> extends FileOutputFormat<E> implements Serializ
 	private transient Schema userDefinedSchema = null;
 
 	private transient Codec codec = null;
-	
+
 	private transient DataFileWriter<E> dataFileWriter;
 
 	public AvroOutputFormat(Path filePath, Class<E> type) {
@@ -124,7 +130,7 @@ public class AvroOutputFormat<E> extends FileOutputFormat<E> implements Serializ
 		if (org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroValueType)) {
 			datumWriter = new SpecificDatumWriter<E>(avroValueType);
 			try {
-				schema = ((org.apache.avro.specific.SpecificRecordBase)avroValueType.newInstance()).getSchema();
+				schema = ((org.apache.avro.specific.SpecificRecordBase) avroValueType.newInstance()).getSchema();
 			} catch (InstantiationException | IllegalAccessException e) {
 				throw new RuntimeException(e.getMessage());
 			}
@@ -152,7 +158,7 @@ public class AvroOutputFormat<E> extends FileOutputFormat<E> implements Serializ
 			out.writeByte(-1);
 		}
 
-		if(userDefinedSchema != null) {
+		if (userDefinedSchema != null) {
 			byte[] json = userDefinedSchema.toString().getBytes(ConfigConstants.DEFAULT_CHARSET);
 			out.writeInt(json.length);
 			out.write(json);
@@ -170,7 +176,7 @@ public class AvroOutputFormat<E> extends FileOutputFormat<E> implements Serializ
 		}
 
 		int length = in.readInt();
-		if(length != 0) {
+		if (length != 0) {
 			byte[] json = new byte[length];
 			in.readFully(json);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b58545ec/flink-connectors/flink-avro/src/test/assembly/test-assembly.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-avro/src/test/assembly/test-assembly.xml b/flink-connectors/flink-avro/src/test/assembly/test-assembly.xml
index 0f4561a..0cbdbe1 100644
--- a/flink-connectors/flink-avro/src/test/assembly/test-assembly.xml
+++ b/flink-connectors/flink-avro/src/test/assembly/test-assembly.xml
@@ -26,11 +26,11 @@ under the License.
 	<fileSets>
 		<fileSet>
 			<directory>${project.build.testOutputDirectory}</directory>
-			<outputDirectory>/</outputDirectory>
+			<outputDirectory></outputDirectory>
 			<!--modify/add include to match your package(s) -->
 			<includes>
 				<include>org/apache/flink/api/avro/testjar/**</include>
 			</includes>
 		</fileSet>
 	</fileSets>
-</assembly>
\ No newline at end of file
+</assembly>

http://git-wip-us.apache.org/repos/asf/flink/blob/b58545ec/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
index 063a363..7bcba04 100644
--- a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
+++ b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
@@ -18,21 +18,25 @@
 
 package org.apache.flink.api.avro;
 
-import java.io.File;
-import java.net.URL;
-import java.util.Collections;
-
+import org.apache.flink.api.avro.testjar.AvroExternalJarProgram;
 import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
-
 import org.apache.flink.test.util.TestEnvironment;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.io.File;
+import java.net.URL;
+import java.util.Collections;
+
+/**
+ * IT case for the {@link AvroExternalJarProgram}.
+ */
 public class AvroExternalJarProgramITCase extends TestLogger {
 
 	private static final String JAR_FILE = "maven-test-jar.jar";

http://git-wip-us.apache.org/repos/asf/flink/blob/b58545ec/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatITCase.java b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatITCase.java
index 3b01ccb..f630f41 100644
--- a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatITCase.java
+++ b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatITCase.java
@@ -18,24 +18,27 @@
 
 package org.apache.flink.api.avro;
 
-import org.junit.Assert;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.avro.file.DataFileReader;
-import org.apache.avro.io.DatumReader;
-import org.apache.avro.reflect.ReflectDatumReader;
-import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.io.avro.example.User;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.java.io.AvroOutputFormat;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.test.util.JavaProgramTestBase;
 
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.junit.Assert;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * IT cases for the {@link AvroOutputFormat}.
+ */
 @SuppressWarnings("serial")
 public class AvroOutputFormatITCase extends JavaProgramTestBase {
 
@@ -57,7 +60,6 @@ public class AvroOutputFormatITCase extends JavaProgramTestBase {
 		outputPath2 = getTempDirPath("avro_output2");
 	}
 
-
 	@Override
 	protected void testProgram() throws Exception {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@@ -129,11 +131,9 @@ public class AvroOutputFormatITCase extends JavaProgramTestBase {
 			Assert.assertTrue("expected user " + expectedResult + " not found.", result2.contains(expectedResult));
 		}
 
-
 	}
 
-
-	public final static class ConvertToUser extends RichMapFunction<Tuple3<String, Integer, String>, User> {
+	private static final class ConvertToUser extends RichMapFunction<Tuple3<String, Integer, String>, User> {
 
 		@Override
 		public User map(Tuple3<String, Integer, String> value) throws Exception {
@@ -141,7 +141,7 @@ public class AvroOutputFormatITCase extends JavaProgramTestBase {
 		}
 	}
 
-	public final static class ConvertToReflective extends RichMapFunction<User, ReflectiveUser> {
+	private static final class ConvertToReflective extends RichMapFunction<User, ReflectiveUser> {
 
 		@Override
 		public ReflectiveUser map(User value) throws Exception {
@@ -149,8 +149,7 @@ public class AvroOutputFormatITCase extends JavaProgramTestBase {
 		}
 	}
 
-	
-	public static class ReflectiveUser {
+	private static class ReflectiveUser {
 		private String name;
 		private int favoriteNumber;
 		private String favoriteColor;
@@ -162,13 +161,15 @@ public class AvroOutputFormatITCase extends JavaProgramTestBase {
 			this.favoriteNumber = favoriteNumber;
 			this.favoriteColor = favoriteColor;
 		}
-		
+
 		public String getName() {
 			return this.name;
 		}
+
 		public String getFavoriteColor() {
 			return this.favoriteColor;
 		}
+
 		public int getFavoriteNumber() {
 			return this.favoriteNumber;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/b58545ec/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java
index c39db15..808c257 100644
--- a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java
+++ b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java
@@ -18,6 +18,16 @@
 
 package org.apache.flink.api.avro;
 
+import org.apache.flink.api.io.avro.generated.Address;
+import org.apache.flink.api.io.avro.generated.Colors;
+import org.apache.flink.api.io.avro.generated.Fixed16;
+import org.apache.flink.api.io.avro.generated.User;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.reflect.ReflectDatumWriter;
+import org.junit.Test;
+
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
@@ -29,16 +39,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Random;
 
-import org.apache.avro.reflect.ReflectDatumReader;
-import org.apache.avro.reflect.ReflectDatumWriter;
-import org.apache.flink.api.io.avro.generated.Address;
-import org.apache.flink.api.io.avro.generated.Colors;
-import org.apache.flink.api.io.avro.generated.Fixed16;
-import org.apache.flink.api.io.avro.generated.User;
-import org.apache.flink.util.StringUtils;
-import org.junit.Test;
-
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
 
 /**
  * Tests the {@link DataOutputEncoder} and {@link DataInputDecoder} classes for Avro serialization.
@@ -48,32 +51,32 @@ public class EncoderDecoderTest {
 	public void testComplexStringsDirecty() {
 		try {
 			Random rnd = new Random(349712539451944123L);
-			
+
 			for (int i = 0; i < 10; i++) {
 				String testString = StringUtils.getRandomString(rnd, 10, 100);
-				
+
 				ByteArrayOutputStream baos = new ByteArrayOutputStream(512);
 				{
 					DataOutputStream dataOut = new DataOutputStream(baos);
 					DataOutputEncoder encoder = new DataOutputEncoder();
 					encoder.setOut(dataOut);
-					
+
 					encoder.writeString(testString);
 					dataOut.flush();
 					dataOut.close();
 				}
-				
+
 				byte[] data = baos.toByteArray();
-				
+
 				// deserialize
 				{
 					ByteArrayInputStream bais = new ByteArrayInputStream(data);
 					DataInputStream dataIn = new DataInputStream(bais);
 					DataInputDecoder decoder = new DataInputDecoder();
 					decoder.setIn(dataIn);
-	
+
 					String deserialized = decoder.readString();
-					
+
 					assertEquals(testString, deserialized);
 				}
 			}
@@ -84,49 +87,49 @@ public class EncoderDecoderTest {
 			fail("Test failed due to an exception: " + e.getMessage());
 		}
 	}
-	
+
 	@Test
 	public void testPrimitiveTypes() {
-		
+
 		testObjectSerialization(new Boolean(true));
 		testObjectSerialization(new Boolean(false));
-		
+
 		testObjectSerialization(Byte.valueOf((byte) 0));
 		testObjectSerialization(Byte.valueOf((byte) 1));
 		testObjectSerialization(Byte.valueOf((byte) -1));
 		testObjectSerialization(Byte.valueOf(Byte.MIN_VALUE));
 		testObjectSerialization(Byte.valueOf(Byte.MAX_VALUE));
-		
+
 		testObjectSerialization(Short.valueOf((short) 0));
 		testObjectSerialization(Short.valueOf((short) 1));
 		testObjectSerialization(Short.valueOf((short) -1));
 		testObjectSerialization(Short.valueOf(Short.MIN_VALUE));
 		testObjectSerialization(Short.valueOf(Short.MAX_VALUE));
-		
+
 		testObjectSerialization(Integer.valueOf(0));
 		testObjectSerialization(Integer.valueOf(1));
 		testObjectSerialization(Integer.valueOf(-1));
 		testObjectSerialization(Integer.valueOf(Integer.MIN_VALUE));
 		testObjectSerialization(Integer.valueOf(Integer.MAX_VALUE));
-		
+
 		testObjectSerialization(Long.valueOf(0));
 		testObjectSerialization(Long.valueOf(1));
 		testObjectSerialization(Long.valueOf(-1));
 		testObjectSerialization(Long.valueOf(Long.MIN_VALUE));
 		testObjectSerialization(Long.valueOf(Long.MAX_VALUE));
-		
+
 		testObjectSerialization(Float.valueOf(0));
 		testObjectSerialization(Float.valueOf(1));
 		testObjectSerialization(Float.valueOf(-1));
-		testObjectSerialization(Float.valueOf((float)Math.E));
-		testObjectSerialization(Float.valueOf((float)Math.PI));
+		testObjectSerialization(Float.valueOf((float) Math.E));
+		testObjectSerialization(Float.valueOf((float) Math.PI));
 		testObjectSerialization(Float.valueOf(Float.MIN_VALUE));
 		testObjectSerialization(Float.valueOf(Float.MAX_VALUE));
 		testObjectSerialization(Float.valueOf(Float.MIN_NORMAL));
 		testObjectSerialization(Float.valueOf(Float.NaN));
 		testObjectSerialization(Float.valueOf(Float.NEGATIVE_INFINITY));
 		testObjectSerialization(Float.valueOf(Float.POSITIVE_INFINITY));
-		
+
 		testObjectSerialization(Double.valueOf(0));
 		testObjectSerialization(Double.valueOf(1));
 		testObjectSerialization(Double.valueOf(-1));
@@ -138,15 +141,15 @@ public class EncoderDecoderTest {
 		testObjectSerialization(Double.valueOf(Double.NaN));
 		testObjectSerialization(Double.valueOf(Double.NEGATIVE_INFINITY));
 		testObjectSerialization(Double.valueOf(Double.POSITIVE_INFINITY));
-		
+
 		testObjectSerialization("");
 		testObjectSerialization("abcdefg");
 		testObjectSerialization("ab\u1535\u0155xyz\u706F");
-		
+
 		testObjectSerialization(new SimpleTypes(3637, 54876486548L, (byte) 65, "We're out looking for astronauts", (short) 0x2387, 2.65767523));
 		testObjectSerialization(new SimpleTypes(705608724, -1L, (byte) -65, "Serve me the sky with a big slice of lemon", (short) Byte.MIN_VALUE, 0.0000001));
 	}
-	
+
 	@Test
 	public void testArrayTypes() {
 		{
@@ -170,7 +173,7 @@ public class EncoderDecoderTest {
 			testObjectSerialization(array);
 		}
 	}
-	
+
 	@Test
 	public void testEmptyArray() {
 		{
@@ -194,14 +197,14 @@ public class EncoderDecoderTest {
 			testObjectSerialization(array);
 		}
 	}
-	
+
 	@Test
 	public void testObjects() {
 		// simple object containing only primitives
 		{
 			testObjectSerialization(new Book(976243875L, "The Serialization Odysse", 42));
 		}
-		
+
 		// object with collection
 		{
 			ArrayList<String> list = new ArrayList<String>();
@@ -210,22 +213,22 @@ public class EncoderDecoderTest {
 			list.add("C");
 			list.add("D");
 			list.add("E");
-			
+
 			testObjectSerialization(new BookAuthor(976243875L, list, "Arno Nym"));
 		}
-		
+
 		// object with empty collection
 		{
 			ArrayList<String> list = new ArrayList<String>();
 			testObjectSerialization(new BookAuthor(987654321L, list, "The Saurus"));
 		}
 	}
-	
+
 	@Test
 	public void testNestedObjectsWithCollections() {
 		testObjectSerialization(new ComplexNestedObject2(true));
 	}
-	
+
 	@Test
 	public void testGeneratedObjectWithNullableFields() {
 		List<CharSequence> strings = Arrays.asList(new CharSequence[] { "These", "strings", "should", "be", "recognizable", "as", "a", "meaningful", "sequence" });
@@ -243,33 +246,33 @@ public class EncoderDecoderTest {
 		User user = new User("Freudenreich", 1337, "macintosh gray",
 				1234567890L, 3.1415926, null, true, strings, bools, null,
 				Colors.GREEN, map, f, new Boolean(true), addr);
-		
+
 		testObjectSerialization(user);
 	}
-	
+
 	@Test
 	public void testVarLenCountEncoding() {
 		try {
 			long[] values = new long[] { 0, 1, 2, 3, 4, 0, 574, 45236, 0, 234623462, 23462462346L, 0, 9734028767869761L, 0x7fffffffffffffffL};
-			
+
 			// write
 			ByteArrayOutputStream baos = new ByteArrayOutputStream(512);
 			{
 				DataOutputStream dataOut = new DataOutputStream(baos);
-				
+
 				for (long val : values) {
 					DataOutputEncoder.writeVarLongCount(dataOut, val);
 				}
-				
+
 				dataOut.flush();
 				dataOut.close();
 			}
-			
+
 			// read
 			{
 				ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
 				DataInputStream dataIn = new DataInputStream(bais);
-				
+
 				for (long val : values) {
 					long read = DataInputDecoder.readVarLongCount(dataIn);
 					assertEquals("Wrong var-len encoded value read.", val, read);
@@ -282,30 +285,30 @@ public class EncoderDecoderTest {
 			fail("Test failed due to an exception: " + e.getMessage());
 		}
 	}
-	
+
 	private static <X> void testObjectSerialization(X obj) {
-		
+
 		try {
-			
+
 			// serialize
 			ByteArrayOutputStream baos = new ByteArrayOutputStream(512);
 			{
 				DataOutputStream dataOut = new DataOutputStream(baos);
 				DataOutputEncoder encoder = new DataOutputEncoder();
 				encoder.setOut(dataOut);
-				
+
 				@SuppressWarnings("unchecked")
 				Class<X> clazz = (Class<X>) obj.getClass();
 				ReflectDatumWriter<X> writer = new ReflectDatumWriter<X>(clazz);
-				
+
 				writer.write(obj, encoder);
 				dataOut.flush();
 				dataOut.close();
 			}
-			
+
 			byte[] data = baos.toByteArray();
 			X result = null;
-			
+
 			// deserialize
 			{
 				ByteArrayInputStream bais = new ByteArrayInputStream(data);
@@ -316,21 +319,21 @@ public class EncoderDecoderTest {
 				@SuppressWarnings("unchecked")
 				Class<X> clazz = (Class<X>) obj.getClass();
 				ReflectDatumReader<X> reader = new ReflectDatumReader<X>(clazz);
-				
-				// create a reuse object if possible, otherwise we have no reuse object 
+
+				// create a reuse object if possible, otherwise we have no reuse object
 				X reuse = null;
 				try {
 					@SuppressWarnings("unchecked")
 					X test = (X) obj.getClass().newInstance();
 					reuse = test;
 				} catch (Throwable t) {}
-				
+
 				result = reader.read(reuse, decoder);
 			}
-			
+
 			// check
 			final String message = "Deserialized object is not the same as the original";
-			
+
 			if (obj.getClass().isArray()) {
 				Class<?> clazz = obj.getClass();
 				if (clazz == byte[].class) {
@@ -366,26 +369,24 @@ public class EncoderDecoderTest {
 			fail("Test failed due to an exception: " + e.getMessage());
 		}
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
 	//  Test Objects
 	// --------------------------------------------------------------------------------------------
 
+	private static final class SimpleTypes {
 
-	public static final class SimpleTypes {
-		
 		private final int iVal;
 		private final long lVal;
 		private final byte bVal;
 		private final String sVal;
 		private final short rVal;
 		private final double dVal;
-		
-		
+
 		public SimpleTypes() {
 			this(0, 0, (byte) 0, "", (short) 0, 0);
 		}
-		
+
 		public SimpleTypes(int iVal, long lVal, byte bVal, String sVal, short rVal, double dVal) {
 			this.iVal = iVal;
 			this.lVal = lVal;
@@ -394,36 +395,36 @@ public class EncoderDecoderTest {
 			this.rVal = rVal;
 			this.dVal = dVal;
 		}
-		
+
 		@Override
 		public boolean equals(Object obj) {
 			if (obj.getClass() == SimpleTypes.class) {
 				SimpleTypes other = (SimpleTypes) obj;
-				
+
 				return other.iVal == this.iVal &&
 						other.lVal == this.lVal &&
 						other.bVal == this.bVal &&
 						other.sVal.equals(this.sVal) &&
 						other.rVal == this.rVal &&
 						other.dVal == this.dVal;
-				
+
 			} else {
 				return false;
 			}
 		}
 	}
-	
-	public static class ComplexNestedObject1 {
-		
+
+	private static class ComplexNestedObject1 {
+
 		private double doubleValue;
-		
+
 		private List<String> stringList;
-		
+
 		public ComplexNestedObject1() {}
-		
+
 		public ComplexNestedObject1(int offInit) {
 			this.doubleValue = 6293485.6723 + offInit;
-				
+
 			this.stringList = new ArrayList<String>();
 			this.stringList.add("A" + offInit);
 			this.stringList.add("somewhat" + offInit);
@@ -432,7 +433,7 @@ public class EncoderDecoderTest {
 			this.stringList.add("of" + offInit);
 			this.stringList.add("strings" + offInit);
 		}
-		
+
 		@Override
 		public boolean equals(Object obj) {
 			if (obj.getClass() == ComplexNestedObject1.class) {
@@ -443,18 +444,18 @@ public class EncoderDecoderTest {
 			}
 		}
 	}
-	
-	public static class ComplexNestedObject2 {
-		
+
+	private static class ComplexNestedObject2 {
+
 		private long longValue;
-		
+
 		private Map<String, ComplexNestedObject1> theMap;
-		
+
 		public ComplexNestedObject2() {}
-		
+
 		public ComplexNestedObject2(boolean init) {
 			this.longValue = 46547;
-				
+
 			this.theMap = new HashMap<String, ComplexNestedObject1>();
 			this.theMap.put("36354L", new ComplexNestedObject1(43546543));
 			this.theMap.put("785611L", new ComplexNestedObject1(45784568));
@@ -463,7 +464,7 @@ public class EncoderDecoderTest {
 			this.theMap.put("1919876876896L", new ComplexNestedObject1(27154));
 			this.theMap.put("-868468468L", new ComplexNestedObject1(546435));
 		}
-		
+
 		@Override
 		public boolean equals(Object obj) {
 			if (obj.getClass() == ComplexNestedObject2.class) {
@@ -474,8 +475,8 @@ public class EncoderDecoderTest {
 			}
 		}
 	}
-	
-	public static class Book {
+
+	private static class Book {
 
 		private long bookId;
 		private String title;
@@ -488,7 +489,7 @@ public class EncoderDecoderTest {
 			this.title = title;
 			this.authorId = authorId;
 		}
-		
+
 		@Override
 		public boolean equals(Object obj) {
 			if (obj.getClass() == Book.class) {
@@ -500,7 +501,7 @@ public class EncoderDecoderTest {
 		}
 	}
 
-	public static class BookAuthor {
+	private static class BookAuthor {
 
 		private long authorId;
 		private List<String> bookTitles;
@@ -513,7 +514,7 @@ public class EncoderDecoderTest {
 			this.bookTitles = bookTitles;
 			this.authorName = authorName;
 		}
-		
+
 		@Override
 		public boolean equals(Object obj) {
 			if (obj.getClass() == BookAuthor.class) {

http://git-wip-us.apache.org/repos/asf/flink/blob/b58545ec/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java
index 1174786..a8541b6 100644
--- a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java
+++ b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java
@@ -16,18 +16,21 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.api.avro.testjar;
 
-// ================================================================================================
-//  This file defines the classes for the AvroExternalJarProgramITCase.
-//  The program is exported into src/test/resources/AvroTestProgram.jar.
-//
-//  THIS FILE MUST STAY FULLY COMMENTED SUCH THAT THE HERE DEFINED CLASSES ARE NOT COMPILED
-//  AND ADDED TO THE test-classes DIRECTORY. OTHERWISE, THE EXTERNAL CLASS LOADING WILL
-//  NOT BE COVERED BY THIS TEST.
-// ================================================================================================
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.functions.RichReduceFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.AvroInputFormat;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.fs.Path;
 
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.reflect.ReflectDatumWriter;
 
 import java.io.File;
 import java.io.IOException;
@@ -35,100 +38,90 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
 
-import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.io.DatumWriter;
-import org.apache.avro.reflect.ReflectData;
-import org.apache.avro.reflect.ReflectDatumWriter;
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.common.functions.RichReduceFunction;
-import org.apache.flink.api.java.io.DiscardingOutputFormat;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.AvroInputFormat;
-import org.apache.flink.core.fs.Path;
-
+/**
+ * This file defines the classes for the AvroExternalJarProgramITCase.
+ */
 public class AvroExternalJarProgram  {
 
-	public static final class Color {
-		
+	private static final class Color {
+
 		private String name;
 		private double saturation;
-		
+
 		public Color() {
 			name = "";
 			saturation = 1.0;
 		}
-		
+
 		public Color(String name, double saturation) {
 			this.name = name;
 			this.saturation = saturation;
 		}
-		
+
 		public String getName() {
 			return name;
 		}
-		
+
 		public void setName(String name) {
 			this.name = name;
 		}
-		
+
 		public double getSaturation() {
 			return saturation;
 		}
-		
+
 		public void setSaturation(double saturation) {
 			this.saturation = saturation;
 		}
-		
+
 		@Override
 		public String toString() {
 			return name + '(' + saturation + ')';
 		}
 	}
-	
-	public static final class MyUser {
-		
+
+	private static final class MyUser {
+
 		private String name;
 		private List<Color> colors;
-		
+
 		public MyUser() {
 			name = "unknown";
 			colors = new ArrayList<Color>();
 		}
-		
+
 		public MyUser(String name, List<Color> colors) {
 			this.name = name;
 			this.colors = colors;
 		}
-		
+
 		public String getName() {
 			return name;
 		}
-		
+
 		public List<Color> getColors() {
 			return colors;
 		}
-		
+
 		public void setName(String name) {
 			this.name = name;
 		}
-		
+
 		public void setColors(List<Color> colors) {
 			this.colors = colors;
 		}
-		
+
 		@Override
 		public String toString() {
 			return name + " : " + colors;
 		}
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
-	
+
 	// --------------------------------------------------------------------------------------------
-	
-	public static final class NameExtractor extends RichMapFunction<MyUser, Tuple2<String, MyUser>> {
+
+	private static final class NameExtractor extends RichMapFunction<MyUser, Tuple2<String, MyUser>> {
 		private static final long serialVersionUID = 1L;
 
 		@Override
@@ -137,8 +130,8 @@ public class AvroExternalJarProgram  {
 			return new Tuple2<String, MyUser>(namePrefix, u);
 		}
 	}
-	
-	public static final class NameGrouper extends RichReduceFunction<Tuple2<String, MyUser>> {
+
+	private static final class NameGrouper extends RichReduceFunction<Tuple2<String, MyUser>> {
 		private static final long serialVersionUID = 1L;
 
 		@Override
@@ -150,52 +143,51 @@ public class AvroExternalJarProgram  {
 	// --------------------------------------------------------------------------------------------
 	//  Test Data
 	// --------------------------------------------------------------------------------------------
-	
-	public static final class Generator {
-		
+
+	private static final class Generator {
+
 		private final Random rnd = new Random(2389756789345689276L);
-		
+
 		public MyUser nextUser() {
 			return randomUser();
 		}
-		
+
 		private MyUser randomUser() {
-			
+
 			int numColors = rnd.nextInt(5);
 			ArrayList<Color> colors = new ArrayList<Color>(numColors);
 			for (int i = 0; i < numColors; i++) {
 				colors.add(new Color(randomString(), rnd.nextDouble()));
 			}
-			
+
 			return new MyUser(randomString(), colors);
 		}
-		
+
 		private String randomString() {
 			char[] c = new char[this.rnd.nextInt(20) + 5];
-			
+
 			for (int i = 0; i < c.length; i++) {
 				c[i] = (char) (this.rnd.nextInt(150) + 40);
 			}
-			
+
 			return new String(c);
 		}
 	}
-	
+
 	public static void writeTestData(File testFile, int numRecords) throws IOException {
-		
+
 		DatumWriter<MyUser> userDatumWriter = new ReflectDatumWriter<MyUser>(MyUser.class);
 		DataFileWriter<MyUser> dataFileWriter = new DataFileWriter<MyUser>(userDatumWriter);
-		
+
 		dataFileWriter.create(ReflectData.get().getSchema(MyUser.class), testFile);
-		
-		
+
 		Generator generator = new Generator();
-		
+
 		for (int i = 0; i < numRecords; i++) {
 			MyUser user = generator.nextUser();
 			dataFileWriter.append(user);
 		}
-		
+
 		dataFileWriter.close();
 	}
 
@@ -203,17 +195,17 @@ public class AvroExternalJarProgram  {
 //		String testDataFile = new File("src/test/resources/testdata.avro").getAbsolutePath();
 //		writeTestData(new File(testDataFile), 50);
 //	}
-	
+
 	public static void main(String[] args) throws Exception {
 		String inputPath = args[0];
-		
+
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		
+
 		DataSet<MyUser> input = env.createInput(new AvroInputFormat<MyUser>(new Path(inputPath), MyUser.class));
-	
+
 		DataSet<Tuple2<String, MyUser>> result = input.map(new NameExtractor()).groupBy(0).reduce(new NameGrouper());
-		
-		result.output(new DiscardingOutputFormat<Tuple2<String,MyUser>>());
+
+		result.output(new DiscardingOutputFormat<Tuple2<String, MyUser>>());
 		env.execute();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b58545ec/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java
index f33f433..be968c5 100644
--- a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java
+++ b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.api.io.avro;
 
 import org.apache.flink.api.common.ExecutionConfig;
@@ -29,6 +30,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.test.util.MultipleProgramsTestBase;
 import org.apache.flink.util.Collector;
+
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -43,6 +45,9 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 
+/**
+ * Tests for the {@link AvroInputFormat} reading Pojos.
+ */
 @RunWith(Parameterized.class)
 public class AvroPojoTest extends MultipleProgramsTestBase {
 	public AvroPojoTest(TestExecutionMode mode) {
@@ -88,7 +93,6 @@ public class AvroPojoTest extends MultipleProgramsTestBase {
 
 		env.execute("Simple Avro read job");
 
-
 		expected = "{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null, \"type_long_test\": null, \"type_double_test\": 123.45, \"type_null_test\": null, \"type_bool_test\": true, \"type_array_string\": [\"ELEMENT 1\", \"ELEMENT 2\"], \"type_array_boolean\": [true, false], \"type_nullable_array\": null, \"type_enum\": \"GREEN\", \"type_map\": null, \"type_fixed\": null, \"type_union\": null, \"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", \"city\": \"London\", \"state\": \"London\", \"zip\": \"NW1 6XE\"}}\n" +
 					"{\"name\": \"Charlie\", \"favorite_number\": null, \"favorite_color\": \"blue\", \"type_long_test\": 1337, \"type_double_test\": 1.337, \"type_null_test\": null, \"type_bool_test\": false, \"type_array_string\": [], \"type_array_boolean\": [], \"type_nullable_array\": null, \"type_enum\": \"RED\", \"type_map\": null, \"type_fixed\": null, \"type_union\": null, \"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", \"city\": \"London\", \"state\": \"London\", \"zip\": \"NW1 6XE\"}}\n";
 	}
@@ -116,7 +120,6 @@ public class AvroPojoTest extends MultipleProgramsTestBase {
 
 		env.execute("Simple Avro read job");
 
-
 		expected = "{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null, \"type_long_test\": null, \"type_double_test\": 123.45, \"type_null_test\": null, \"type_bool_test\": true, \"type_array_string\": [\"ELEMENT 1\", \"ELEMENT 2\"], \"type_array_boolean\": [true, false], \"type_nullable_array\": null, \"type_enum\": \"GREEN\", \"type_map\": {\"hehe\": 12}, \"type_fixed\": null, \"type_union\": null, \"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", \"city\": \"London\", \"state\": \"London\", \"zip\": \"NW1 6XE\"}}\n" +
 					"{\"name\": \"Charlie\", \"favorite_number\": null, \"favorite_color\": \"blue\", \"type_long_test\": 1337, \"type_double_test\": 1.337, \"type_null_test\": null, \"type_bool_test\": false, \"type_array_string\": [], \"type_array_boolean\": [], \"type_nullable_array\": null, \"type_enum\": \"RED\", \"type_map\": {\"hehe\": 12}, \"type_fixed\": null, \"type_union\": null, \"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", \"city\": \"London\", \"state\": \"London\", \"zip\": \"NW1 6XE\"}}\n";
 
@@ -142,7 +145,6 @@ public class AvroPojoTest extends MultipleProgramsTestBase {
 		res.writeAsText(resultPath);
 		env.execute("Avro Key selection");
 
-
 		expected = "(Alyssa,1)\n(Charlie,1)\n";
 	}
 
@@ -163,7 +165,7 @@ public class AvroPojoTest extends MultipleProgramsTestBase {
 		}).reduceGroup(new GroupReduceFunction<User, Tuple2<String, Integer>>() {
 			@Override
 			public void reduce(Iterable<User> values, Collector<Tuple2<String, Integer>> out) throws Exception {
-				for(User u : values) {
+				for (User u : values) {
 					out.collect(new Tuple2<String, Integer>(u.getName().toString(), 1));
 				}
 			}
@@ -172,7 +174,6 @@ public class AvroPojoTest extends MultipleProgramsTestBase {
 		res.writeAsText(resultPath);
 		env.execute("Avro Key selection");
 
-
 		expected = "(Charlie,1)\n(Alyssa,1)\n";
 	}
 
@@ -202,16 +203,15 @@ public class AvroPojoTest extends MultipleProgramsTestBase {
 		res.writeAsText(resultPath);
 		env.execute("Avro Key selection");
 
-
 		expected = "(Charlie,1)\n(Alyssa,1)\n";
 	}
 
 	/**
-	 * Test some know fields for grouping on
+	 * Test some know fields for grouping on.
 	 */
 	@Test
 	public void testAllFields() throws Exception {
-		for(String fieldName : Arrays.asList("name", "type_enum", "type_double_test")) {
+		for (String fieldName : Arrays.asList("name", "type_enum", "type_double_test")) {
 			testField(fieldName);
 		}
 	}
@@ -228,7 +228,7 @@ public class AvroPojoTest extends MultipleProgramsTestBase {
 		DataSet<Object> res = usersDS.groupBy(fieldName).reduceGroup(new GroupReduceFunction<User, Object>() {
 			@Override
 			public void reduce(Iterable<User> values, Collector<Object> out) throws Exception {
-				for(User u : values) {
+				for (User u : values) {
 					out.collect(u.get(fieldName));
 				}
 			}
@@ -240,11 +240,11 @@ public class AvroPojoTest extends MultipleProgramsTestBase {
 		ExecutionConfig ec = env.getConfig();
 		Assert.assertTrue(ec.getRegisteredKryoTypes().contains(org.apache.flink.api.io.avro.generated.Fixed16.class));
 
-		if(fieldName.equals("name")) {
+		if (fieldName.equals("name")) {
 			expected = "Alyssa\nCharlie";
-		} else if(fieldName.equals("type_enum")) {
+		} else if (fieldName.equals("type_enum")) {
 			expected = "GREEN\nRED\n";
-		} else if(fieldName.equals("type_double_test")) {
+		} else if (fieldName.equals("type_double_test")) {
 			expected = "123.45\n1.337\n";
 		} else {
 			Assert.fail("Unknown field");

http://git-wip-us.apache.org/repos/asf/flink/blob/b58545ec/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java
index 3b6ad63..7bff28a 100644
--- a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java
+++ b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java
@@ -18,18 +18,6 @@
 
 package org.apache.flink.api.io.avro;
 
-import org.apache.avro.Schema;
-import org.apache.avro.file.DataFileReader;
-import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.file.FileReader;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericDatumReader;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.io.DatumReader;
-import org.apache.avro.io.DatumWriter;
-import org.apache.avro.specific.SpecificDatumReader;
-import org.apache.avro.specific.SpecificDatumWriter;
-import org.apache.avro.util.Utf8;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -46,6 +34,19 @@ import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.file.FileReader;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.avro.util.Utf8;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -73,33 +74,31 @@ import static org.junit.Assert.assertTrue;
  * http://avro.apache.org/docs/current/gettingstartedjava.html
  */
 public class AvroRecordInputFormatTest {
-	
+
 	public File testFile;
-	
-	final static String TEST_NAME = "Alyssa";
-	
-	final static String TEST_ARRAY_STRING_1 = "ELEMENT 1";
-	final static String TEST_ARRAY_STRING_2 = "ELEMENT 2";
-	
-	final static boolean TEST_ARRAY_BOOLEAN_1 = true;
-	final static boolean TEST_ARRAY_BOOLEAN_2 = false;
-	
-	final static Colors TEST_ENUM_COLOR = Colors.GREEN;
-	
-	final static String TEST_MAP_KEY1 = "KEY 1";
-	final static long TEST_MAP_VALUE1 = 8546456L;
-	final static String TEST_MAP_KEY2 = "KEY 2";
-	final static long TEST_MAP_VALUE2 = 17554L;
-	
-	final static int TEST_NUM = 239;
-	final static String TEST_STREET = "Baker Street";
-	final static String TEST_CITY = "London";
-	final static String TEST_STATE = "London";
-	final static String TEST_ZIP = "NW1 6XE";
-	
 
-	private Schema userSchema = new User().getSchema();
+	static final String TEST_NAME = "Alyssa";
+
+	static final String TEST_ARRAY_STRING_1 = "ELEMENT 1";
+	static final String TEST_ARRAY_STRING_2 = "ELEMENT 2";
+
+	static final boolean TEST_ARRAY_BOOLEAN_1 = true;
+	static final boolean TEST_ARRAY_BOOLEAN_2 = false;
+
+	static final Colors TEST_ENUM_COLOR = Colors.GREEN;
 
+	static final String TEST_MAP_KEY1 = "KEY 1";
+	static final long TEST_MAP_VALUE1 = 8546456L;
+	static final String TEST_MAP_KEY2 = "KEY 2";
+	static final long TEST_MAP_VALUE2 = 17554L;
+
+	static final int TEST_NUM = 239;
+	static final String TEST_STREET = "Baker Street";
+	static final String TEST_CITY = "London";
+	static final String TEST_STATE = "London";
+	static final String TEST_ZIP = "NW1 6XE";
+
+	private Schema userSchema = new User().getSchema();
 
 	public static void writeTestFile(File testFile) throws IOException {
 		ArrayList<CharSequence> stringArray = new ArrayList<CharSequence>();
@@ -113,7 +112,7 @@ public class AvroRecordInputFormatTest {
 		HashMap<CharSequence, Long> longMap = new HashMap<CharSequence, Long>();
 		longMap.put(TEST_MAP_KEY1, TEST_MAP_VALUE1);
 		longMap.put(TEST_MAP_KEY2, TEST_MAP_VALUE2);
-		
+
 		Address addr = new Address();
 		addr.setNum(TEST_NUM);
 		addr.setStreet(TEST_STREET);
@@ -121,7 +120,6 @@ public class AvroRecordInputFormatTest {
 		addr.setState(TEST_STATE);
 		addr.setZip(TEST_ZIP);
 
-
 		User user1 = new User();
 
 		user1.setName(TEST_NAME);
@@ -162,13 +160,13 @@ public class AvroRecordInputFormatTest {
 		dataFileWriter.append(user2);
 		dataFileWriter.close();
 	}
+
 	@Before
 	public void createFiles() throws IOException {
 		testFile = File.createTempFile("AvroInputFormatTest", null);
 		writeTestFile(testFile);
 	}
 
-
 	/**
 	 * Test if the AvroInputFormat is able to properly read data from an avro file.
 	 * @throws IOException
@@ -176,45 +174,45 @@ public class AvroRecordInputFormatTest {
 	@Test
 	public void testDeserialisation() throws IOException {
 		Configuration parameters = new Configuration();
-		
+
 		AvroInputFormat<User> format = new AvroInputFormat<User>(new Path(testFile.getAbsolutePath()), User.class);
-		
+
 		format.configure(parameters);
 		FileInputSplit[] splits = format.createInputSplits(1);
 		assertEquals(splits.length, 1);
 		format.open(splits[0]);
-		
+
 		User u = format.nextRecord(null);
 		assertNotNull(u);
-		
+
 		String name = u.getName().toString();
 		assertNotNull("empty record", name);
 		assertEquals("name not equal", TEST_NAME, name);
-		
+
 		// check arrays
 		List<CharSequence> sl = u.getTypeArrayString();
 		assertEquals("element 0 not equal", TEST_ARRAY_STRING_1, sl.get(0).toString());
 		assertEquals("element 1 not equal", TEST_ARRAY_STRING_2, sl.get(1).toString());
-		
+
 		List<Boolean> bl = u.getTypeArrayBoolean();
 		assertEquals("element 0 not equal", TEST_ARRAY_BOOLEAN_1, bl.get(0));
 		assertEquals("element 1 not equal", TEST_ARRAY_BOOLEAN_2, bl.get(1));
-		
+
 		// check enums
 		Colors enumValue = u.getTypeEnum();
 		assertEquals("enum not equal", TEST_ENUM_COLOR, enumValue);
-		
+
 		// check maps
 		Map<CharSequence, Long> lm = u.getTypeMap();
 		assertEquals("map value of key 1 not equal", TEST_MAP_VALUE1, lm.get(new Utf8(TEST_MAP_KEY1)).longValue());
 		assertEquals("map value of key 2 not equal", TEST_MAP_VALUE2, lm.get(new Utf8(TEST_MAP_KEY2)).longValue());
-		
+
 		assertFalse("expecting second element", format.reachedEnd());
 		assertNotNull("expecting second element", format.nextRecord(u));
-		
+
 		assertNull(format.nextRecord(u));
 		assertTrue(format.reachedEnd());
-		
+
 		format.close();
 	}
 
@@ -225,46 +223,46 @@ public class AvroRecordInputFormatTest {
 	@Test
 	public void testDeserialisationReuseAvroRecordFalse() throws IOException {
 		Configuration parameters = new Configuration();
-		
+
 		AvroInputFormat<User> format = new AvroInputFormat<User>(new Path(testFile.getAbsolutePath()), User.class);
 		format.setReuseAvroValue(false);
-		
+
 		format.configure(parameters);
 		FileInputSplit[] splits = format.createInputSplits(1);
 		assertEquals(splits.length, 1);
 		format.open(splits[0]);
-		
+
 		User u = format.nextRecord(null);
 		assertNotNull(u);
-		
+
 		String name = u.getName().toString();
 		assertNotNull("empty record", name);
 		assertEquals("name not equal", TEST_NAME, name);
-		
+
 		// check arrays
 		List<CharSequence> sl = u.getTypeArrayString();
 		assertEquals("element 0 not equal", TEST_ARRAY_STRING_1, sl.get(0).toString());
 		assertEquals("element 1 not equal", TEST_ARRAY_STRING_2, sl.get(1).toString());
-		
+
 		List<Boolean> bl = u.getTypeArrayBoolean();
 		assertEquals("element 0 not equal", TEST_ARRAY_BOOLEAN_1, bl.get(0));
 		assertEquals("element 1 not equal", TEST_ARRAY_BOOLEAN_2, bl.get(1));
-		
+
 		// check enums
 		Colors enumValue = u.getTypeEnum();
 		assertEquals("enum not equal", TEST_ENUM_COLOR, enumValue);
-		
+
 		// check maps
 		Map<CharSequence, Long> lm = u.getTypeMap();
 		assertEquals("map value of key 1 not equal", TEST_MAP_VALUE1, lm.get(new Utf8(TEST_MAP_KEY1)).longValue());
 		assertEquals("map value of key 2 not equal", TEST_MAP_VALUE2, lm.get(new Utf8(TEST_MAP_KEY2)).longValue());
-		
+
 		assertFalse("expecting second element", format.reachedEnd());
 		assertNotNull("expecting second element", format.nextRecord(u));
-		
+
 		assertNull(format.nextRecord(u));
 		assertTrue(format.reachedEnd());
-		
+
 		format.close();
 	}
 
@@ -274,7 +272,7 @@ public class AvroRecordInputFormatTest {
 	 * However, if generated classes are not available, one can also use GenericData.Record.
 	 * It is an untyped key-value record which is using a schema to validate the correctness of the data.
 	 *
-	 * It is not recommended to use GenericData.Record with Flink. Use generated POJOs instead.
+	 * <p>It is not recommended to use GenericData.Record with Flink. Use generated POJOs instead.
 	 */
 	@Test
 	public void testDeserializeToGenericType() throws IOException {
@@ -284,7 +282,7 @@ public class AvroRecordInputFormatTest {
 			// initialize Record by reading it from disk (that's easier than creating it by hand)
 			GenericData.Record rec = new GenericData.Record(userSchema);
 			dataFileReader.next(rec);
-			
+
 			// check if record has been read correctly
 			assertNotNull(rec);
 			assertEquals("name not equal", TEST_NAME, rec.get("name").toString());
@@ -296,7 +294,7 @@ public class AvroRecordInputFormatTest {
 
 			ExecutionConfig ec = new ExecutionConfig();
 			Assert.assertEquals(GenericTypeInfo.class, te.getClass());
-			
+
 			Serializers.recursivelyRegisterType(te.getTypeClass(), ec, new HashSet<Class<?>>());
 
 			TypeSerializer<GenericData.Record> tser = te.createSerializer(ec);
@@ -312,8 +310,7 @@ public class AvroRecordInputFormatTest {
 
 			GenericData.Record newRec;
 			try (DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(
-					new ByteArrayInputStream(out.toByteArray())))
-			{
+					new ByteArrayInputStream(out.toByteArray()))) {
 				newRec = tser.deserialize(inView);
 			}
 
@@ -324,7 +321,7 @@ public class AvroRecordInputFormatTest {
 			assertEquals(null, newRec.get("type_long_test"));
 		}
 	}
-		
+
 	/**
 	 * This test validates proper serialization with specific (generated POJO) types.
 	 */
@@ -355,8 +352,7 @@ public class AvroRecordInputFormatTest {
 
 			User newRec;
 			try (DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(
-					new ByteArrayInputStream(out.toByteArray())))
-			{
+					new ByteArrayInputStream(out.toByteArray()))) {
 				newRec = tser.deserialize(inView);
 			}
 
@@ -370,9 +366,8 @@ public class AvroRecordInputFormatTest {
 	/**
 	 * Test if the AvroInputFormat is able to properly read data from an Avro
 	 * file as a GenericRecord.
-	 * 
-	 * @throws IOException,
-	 *             if there is an exception
+	 *
+	 * @throws IOException
 	 */
 	@Test
 	public void testDeserialisationGenericRecord() throws IOException {
@@ -385,8 +380,8 @@ public class AvroRecordInputFormatTest {
 	}
 
 	/**
-	 * Helper method to test GenericRecord serialisation
-	 * 
+	 * Helper method to test GenericRecord serialisation.
+	 *
 	 * @param format
 	 *            the format to test
 	 * @param parameters
@@ -441,10 +436,9 @@ public class AvroRecordInputFormatTest {
 
 	/**
 	 * Test if the AvroInputFormat is able to properly read data from an avro
-	 * file as a GenericRecord
-	 * 
-	 * @throws IOException,
-	 *             if there is an error
+	 * file as a GenericRecord.
+	 *
+	 * @throws IOException if there is an error
 	 */
 	@Test
 	public void testDeserialisationGenericRecordReuseAvroValueFalse() throws IOException {

http://git-wip-us.apache.org/repos/asf/flink/blob/b58545ec/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroSplittableInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroSplittableInputFormatTest.java b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroSplittableInputFormatTest.java
index 37a83d1..6401a87 100644
--- a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroSplittableInputFormatTest.java
+++ b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroSplittableInputFormatTest.java
@@ -18,9 +18,6 @@
 
 package org.apache.flink.api.io.avro;
 
-import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.io.DatumWriter;
-import org.apache.avro.specific.SpecificDatumWriter;
 import org.apache.flink.api.io.avro.generated.Address;
 import org.apache.flink.api.io.avro.generated.Colors;
 import org.apache.flink.api.io.avro.generated.Fixed16;
@@ -30,6 +27,10 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.fs.Path;
+
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.specific.SpecificDatumWriter;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -49,56 +50,55 @@ import static org.junit.Assert.assertEquals;
  * http://avro.apache.org/docs/current/gettingstartedjava.html
  */
 public class AvroSplittableInputFormatTest {
-	
+
 	private File testFile;
-	
-	final static String TEST_NAME = "Alyssa";
-	
-	final static String TEST_ARRAY_STRING_1 = "ELEMENT 1";
-	final static String TEST_ARRAY_STRING_2 = "ELEMENT 2";
-	
-	final static boolean TEST_ARRAY_BOOLEAN_1 = true;
-	final static boolean TEST_ARRAY_BOOLEAN_2 = false;
-	
-	final static Colors TEST_ENUM_COLOR = Colors.GREEN;
-	
-	final static String TEST_MAP_KEY1 = "KEY 1";
-	final static long TEST_MAP_VALUE1 = 8546456L;
-	final static String TEST_MAP_KEY2 = "KEY 2";
-	final static long TEST_MAP_VALUE2 = 17554L;
-
-	final static Integer TEST_NUM = new Integer(239);
-	final static String TEST_STREET = "Baker Street";
-	final static String TEST_CITY = "London";
-	final static String TEST_STATE = "London";
-	final static String TEST_ZIP = "NW1 6XE";
-	
-	final static int NUM_RECORDS = 5000;
+
+	static final String TEST_NAME = "Alyssa";
+
+	static final String TEST_ARRAY_STRING_1 = "ELEMENT 1";
+	static final String TEST_ARRAY_STRING_2 = "ELEMENT 2";
+
+	static final boolean TEST_ARRAY_BOOLEAN_1 = true;
+	static final boolean TEST_ARRAY_BOOLEAN_2 = false;
+
+	static final Colors TEST_ENUM_COLOR = Colors.GREEN;
+
+	static final String TEST_MAP_KEY1 = "KEY 1";
+	static final long TEST_MAP_VALUE1 = 8546456L;
+	static final String TEST_MAP_KEY2 = "KEY 2";
+	static final long TEST_MAP_VALUE2 = 17554L;
+
+	static final Integer TEST_NUM = new Integer(239);
+	static final String TEST_STREET = "Baker Street";
+	static final String TEST_CITY = "London";
+	static final String TEST_STATE = "London";
+	static final String TEST_ZIP = "NW1 6XE";
+
+	static final int NUM_RECORDS = 5000;
 
 	@Before
 	public void createFiles() throws IOException {
 		testFile = File.createTempFile("AvroSplittableInputFormatTest", null);
-		
+
 		ArrayList<CharSequence> stringArray = new ArrayList<CharSequence>();
 		stringArray.add(TEST_ARRAY_STRING_1);
 		stringArray.add(TEST_ARRAY_STRING_2);
-		
+
 		ArrayList<Boolean> booleanArray = new ArrayList<Boolean>();
 		booleanArray.add(TEST_ARRAY_BOOLEAN_1);
 		booleanArray.add(TEST_ARRAY_BOOLEAN_2);
-		
+
 		HashMap<CharSequence, Long> longMap = new HashMap<CharSequence, Long>();
 		longMap.put(TEST_MAP_KEY1, TEST_MAP_VALUE1);
 		longMap.put(TEST_MAP_KEY2, TEST_MAP_VALUE2);
-		
+
 		Address addr = new Address();
 		addr.setNum(new Integer(TEST_NUM));
 		addr.setStreet(TEST_STREET);
 		addr.setCity(TEST_CITY);
 		addr.setState(TEST_STATE);
 		addr.setZip(TEST_ZIP);
-		
-		
+
 		User user1 = new User();
 		user1.setName(TEST_NAME);
 		user1.setFavoriteNumber(256);
@@ -109,29 +109,28 @@ public class AvroSplittableInputFormatTest {
 		user1.setTypeEnum(TEST_ENUM_COLOR);
 		user1.setTypeMap(longMap);
 		user1.setTypeNested(addr);
-		
+
 		// Construct via builder
 		User user2 = User.newBuilder()
-		             .setName(TEST_NAME)
-		             .setFavoriteColor("blue")
-		             .setFavoriteNumber(null)
-		             .setTypeBoolTest(false)
-		             .setTypeDoubleTest(1.337d)
-		             .setTypeNullTest(null)
-		             .setTypeLongTest(1337L)
-		             .setTypeArrayString(new ArrayList<CharSequence>())
-		             .setTypeArrayBoolean(new ArrayList<Boolean>())
-		             .setTypeNullableArray(null)
-		             .setTypeEnum(Colors.RED)
-		             .setTypeMap(new HashMap<CharSequence, Long>())
-					 .setTypeFixed(new Fixed16())
-					 .setTypeUnion(123L)
+				.setName(TEST_NAME)
+				.setFavoriteColor("blue")
+				.setFavoriteNumber(null)
+				.setTypeBoolTest(false)
+				.setTypeDoubleTest(1.337d)
+				.setTypeNullTest(null)
+				.setTypeLongTest(1337L)
+				.setTypeArrayString(new ArrayList<CharSequence>())
+				.setTypeArrayBoolean(new ArrayList<Boolean>())
+				.setTypeNullableArray(null)
+				.setTypeEnum(Colors.RED)
+				.setTypeMap(new HashMap<CharSequence, Long>())
+				.setTypeFixed(new Fixed16())
+				.setTypeUnion(123L)
 				.setTypeNested(
 						Address.newBuilder().setNum(TEST_NUM).setStreet(TEST_STREET)
 								.setCity(TEST_CITY).setState(TEST_STATE).setZip(TEST_ZIP)
 								.build())
-
-		             .build();
+				.build();
 		DatumWriter<User> userDatumWriter = new SpecificDatumWriter<User>(User.class);
 		DataFileWriter<User> dataFileWriter = new DataFileWriter<User>(userDatumWriter);
 		dataFileWriter.create(user1.getSchema(), testFile);
@@ -139,7 +138,7 @@ public class AvroSplittableInputFormatTest {
 		dataFileWriter.append(user2);
 
 		Random rnd = new Random(1337);
-		for(int i = 0; i < NUM_RECORDS -2 ; i++) {
+		for (int i = 0; i < NUM_RECORDS - 2; i++) {
 			User user = new User();
 			user.setName(TEST_NAME + rnd.nextInt());
 			user.setFavoriteNumber(rnd.nextInt());
@@ -161,21 +160,21 @@ public class AvroSplittableInputFormatTest {
 		}
 		dataFileWriter.close();
 	}
-	
+
 	@Test
 	public void testSplittedIF() throws IOException {
 		Configuration parameters = new Configuration();
-		
+
 		AvroInputFormat<User> format = new AvroInputFormat<User>(new Path(testFile.getAbsolutePath()), User.class);
 
 		format.configure(parameters);
 		FileInputSplit[] splits = format.createInputSplits(4);
 		assertEquals(splits.length, 4);
 		int elements = 0;
-		int elementsPerSplit[] = new int[4];
-		for(int i = 0; i < splits.length; i++) {
+		int[] elementsPerSplit = new int[4];
+		for (int i = 0; i < splits.length; i++) {
 			format.open(splits[i]);
-			while(!format.reachedEnd()) {
+			while (!format.reachedEnd()) {
 				User u = format.nextRecord(null);
 				Assert.assertTrue(u.getName().toString().startsWith(TEST_NAME));
 				elements++;
@@ -205,15 +204,15 @@ public class AvroSplittableInputFormatTest {
 		assertEquals(splits.length, 4);
 
 		int elements = 0;
-		int elementsPerSplit[] = new int[4];
-		for(int i = 0; i < splits.length; i++) {
+		int[] elementsPerSplit = new int[4];
+		for (int i = 0; i < splits.length; i++) {
 			format.reopen(splits[i], format.getCurrentState());
-			while(!format.reachedEnd()) {
+			while (!format.reachedEnd()) {
 				User u = format.nextRecord(null);
 				Assert.assertTrue(u.getName().toString().startsWith(TEST_NAME));
 				elements++;
 
-				if(format.getRecordsReadFromBlock() == recordsUntilCheckpoint) {
+				if (format.getRecordsReadFromBlock() == recordsUntilCheckpoint) {
 
 					// do the whole checkpoint-restore procedure and see if we pick up from where we left off.
 					Tuple2<Long, Long> state = format.getCurrentState();
@@ -251,15 +250,15 @@ public class AvroSplittableInputFormatTest {
 		assertEquals(splits.length, 4);
 
 		int elements = 0;
-		int elementsPerSplit[] = new int[4];
-		for(int i = 0; i < splits.length; i++) {
+		int[] elementsPerSplit = new int[4];
+		for (int i = 0; i < splits.length; i++) {
 			format.open(splits[i]);
-			while(!format.reachedEnd()) {
+			while (!format.reachedEnd()) {
 				User u = format.nextRecord(null);
 				Assert.assertTrue(u.getName().toString().startsWith(TEST_NAME));
 				elements++;
 
-				if(format.getRecordsReadFromBlock() == recordsUntilCheckpoint) {
+				if (format.getRecordsReadFromBlock() == recordsUntilCheckpoint) {
 
 					// do the whole checkpoint-restore procedure and see if we pick up from where we left off.
 					Tuple2<Long, Long> state = format.getCurrentState();
@@ -305,12 +304,12 @@ public class AvroSplittableInputFormatTest {
 		int elementsPerSplit[] = new int[4];
 		int cnt = 0;
 		int i = 0;
-		for(InputSplit s:sp) {
+		for (InputSplit s:sp) {
 			RecordReader<AvroWrapper<User>, NullWritable> r = format.getRecordReader(s, jf, new HadoopDummyReporter());
 			AvroWrapper<User> k = r.createKey();
 			NullWritable v = r.createValue();
 
-			while(r.next(k,v)) {
+			while (r.next(k, v)) {
 				cnt++;
 				elementsPerSplit[i]++;
 			}
@@ -318,7 +317,7 @@ public class AvroSplittableInputFormatTest {
 		}
 		System.out.println("Status "+Arrays.toString(elementsPerSplit));
 	} **/
-	
+
 	@After
 	public void deleteFiles() {
 		testFile.delete();

http://git-wip-us.apache.org/repos/asf/flink/blob/b58545ec/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/example/AvroTypeExample.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/example/AvroTypeExample.java b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/example/AvroTypeExample.java
index 5a21691..96ffb7f 100644
--- a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/example/AvroTypeExample.java
+++ b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/example/AvroTypeExample.java
@@ -18,9 +18,6 @@
 
 package org.apache.flink.api.io.avro.example;
 
-import java.io.IOException;
-import java.util.Random;
-
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.io.GenericInputFormat;
@@ -29,10 +26,15 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.util.Collector;
 
+import java.io.IOException;
+import java.util.Random;
+
+/**
+ * Example that shows how to use an Avro typea in a program.
+ */
 @SuppressWarnings("serial")
 public class AvroTypeExample {
-	
-	
+
 	public static void main(String[] args) throws Exception {
 
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@@ -45,49 +47,45 @@ public class AvroTypeExample {
 			.reduceGroup(new ConcatenatingReducer())
 			.print();
 	}
-	
-	
-	public static final class NumberExtractingMapper implements MapFunction<User, Tuple2<User, Integer>> {
-		
+
+	private static final class NumberExtractingMapper implements MapFunction<User, Tuple2<User, Integer>> {
+
 		@Override
 		public Tuple2<User, Integer> map(User user) {
 			return new Tuple2<User, Integer>(user, user.getFavoriteNumber());
 		}
 	}
-	
-	
-	public static final class ConcatenatingReducer implements GroupReduceFunction<Tuple2<User, Integer>, Tuple2<Integer, String>> {
+
+	private static final class ConcatenatingReducer implements GroupReduceFunction<Tuple2<User, Integer>, Tuple2<Integer, String>> {
 
 		@Override
 		public void reduce(Iterable<Tuple2<User, Integer>> values, Collector<Tuple2<Integer, String>> out) throws Exception {
 			int number = 0;
 			StringBuilder colors = new StringBuilder();
-			
+
 			for (Tuple2<User, Integer> u : values) {
 				number = u.f1;
 				colors.append(u.f0.getFavoriteColor()).append(" - ");
 			}
-			
+
 			colors.setLength(colors.length() - 3);
 			out.collect(new Tuple2<Integer, String>(number, colors.toString()));
 		}
 	}
-	
-	
-	public static final class UserGeneratingInputFormat extends GenericInputFormat<User> {
+
+	private static final class UserGeneratingInputFormat extends GenericInputFormat<User> {
 
 		private static final long serialVersionUID = 1L;
-		
+
 		private static final int NUM = 100;
-		
+
 		private final Random rnd = new Random(32498562304986L);
-		
+
 		private static final String[] NAMES = { "Peter", "Bob", "Liddy", "Alexander", "Stan" };
-		
+
 		private static final String[] COLORS = { "mauve", "crimson", "copper", "sky", "grass" };
-		
+
 		private int count;
-		
 
 		@Override
 		public boolean reachedEnd() throws IOException {
@@ -97,7 +95,7 @@ public class AvroTypeExample {
 		@Override
 		public User nextRecord(User reuse) throws IOException {
 			count++;
-			
+
 			User u = new User();
 			u.setName(NAMES[rnd.nextInt(NAMES.length)]);
 			u.setFavoriteColor(COLORS[rnd.nextInt(COLORS.length)]);

http://git-wip-us.apache.org/repos/asf/flink/blob/b58545ec/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroInputFormatTypeExtractionTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroInputFormatTypeExtractionTest.java b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroInputFormatTypeExtractionTest.java
index e245026..5ae88ca 100644
--- a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroInputFormatTypeExtractionTest.java
+++ b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroInputFormatTypeExtractionTest.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.api.java.io;
 
 import org.apache.flink.api.common.io.InputFormat;
@@ -26,9 +25,13 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.core.fs.Path;
+
 import org.junit.Assert;
 import org.junit.Test;
 
+/**
+ * Tests for the type extraction of the {@link AvroInputFormat}.
+ */
 public class AvroInputFormatTypeExtractionTest {
 
 	@Test
@@ -42,7 +45,6 @@ public class AvroInputFormatTypeExtractionTest {
 			DataSet<MyAvroType> input = env.createInput(format);
 			TypeInformation<?> typeInfoDataSet = input.getType();
 
-
 			Assert.assertTrue(typeInfoDirect instanceof PojoTypeInfo);
 			Assert.assertTrue(typeInfoDataSet instanceof PojoTypeInfo);
 
@@ -54,6 +56,9 @@ public class AvroInputFormatTypeExtractionTest {
 		}
 	}
 
+	/**
+	 * Test type.
+	 */
 	public static final class MyAvroType {
 
 		public String theString;

http://git-wip-us.apache.org/repos/asf/flink/blob/b58545ec/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroOutputFormatTest.java b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroOutputFormatTest.java
index f843d3b..87334a7 100644
--- a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroOutputFormatTest.java
+++ b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroOutputFormatTest.java
@@ -18,9 +18,14 @@
 
 package org.apache.flink.api.java.io;
 
-import static org.apache.flink.api.java.io.AvroOutputFormat.Codec;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import org.apache.flink.api.io.avro.example.User;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
+import org.apache.avro.Schema;
+import org.junit.Test;
+import org.mockito.internal.util.reflection.Whitebox;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -29,16 +34,12 @@ import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 
-import org.apache.avro.Schema;
-import org.apache.flink.api.io.avro.example.User;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
-import org.junit.Test;
-import org.mockito.internal.util.reflection.Whitebox;
+import static org.apache.flink.api.java.io.AvroOutputFormat.Codec;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 /**
- * Tests for {@link AvroOutputFormat}
+ * Tests for {@link AvroOutputFormat}.
  */
 public class AvroOutputFormatTest {
 
@@ -116,12 +117,12 @@ public class AvroOutputFormatTest {
 	@Test
 	public void testCompression() throws Exception {
 		// given
-		final Path outputPath = new Path(File.createTempFile("avro-output-file","avro").getAbsolutePath());
-		final AvroOutputFormat<User> outputFormat = new AvroOutputFormat<>(outputPath,User.class);
+		final Path outputPath = new Path(File.createTempFile("avro-output-file", "avro").getAbsolutePath());
+		final AvroOutputFormat<User> outputFormat = new AvroOutputFormat<>(outputPath, User.class);
 		outputFormat.setWriteMode(FileSystem.WriteMode.OVERWRITE);
 
-		final Path compressedOutputPath = new Path(File.createTempFile("avro-output-file","compressed.avro").getAbsolutePath());
-		final AvroOutputFormat<User> compressedOutputFormat = new AvroOutputFormat<>(compressedOutputPath,User.class);
+		final Path compressedOutputPath = new Path(File.createTempFile("avro-output-file", "compressed.avro").getAbsolutePath());
+		final AvroOutputFormat<User> compressedOutputFormat = new AvroOutputFormat<>(compressedOutputPath, User.class);
 		compressedOutputFormat.setWriteMode(FileSystem.WriteMode.OVERWRITE);
 		compressedOutputFormat.setCodec(Codec.SNAPPY);
 
@@ -144,9 +145,9 @@ public class AvroOutputFormatTest {
 
 	private void output(final AvroOutputFormat<User> outputFormat) throws IOException {
 		outputFormat.configure(new Configuration());
-		outputFormat.open(1,1);
+		outputFormat.open(1, 1);
 		for (int i = 0; i < 100; i++) {
-			outputFormat.writeRecord(new User("testUser",1,"blue"));
+			outputFormat.writeRecord(new User("testUser", 1, "blue"));
 		}
 		outputFormat.close();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/b58545ec/flink-connectors/flink-avro/src/test/resources/avro/user.avsc
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-avro/src/test/resources/avro/user.avsc b/flink-connectors/flink-avro/src/test/resources/avro/user.avsc
index 02c11af..ab8adf5 100644
--- a/flink-connectors/flink-avro/src/test/resources/avro/user.avsc
+++ b/flink-connectors/flink-avro/src/test/resources/avro/user.avsc
@@ -21,8 +21,8 @@
      {"name": "type_double_test", "type": "double"},
      {"name": "type_null_test", "type": ["null"]},
      {"name": "type_bool_test", "type": ["boolean"]},
-     {"name": "type_array_string", "type" : {"type" : "array", "items" : "string"}},  
-     {"name": "type_array_boolean", "type" : {"type" : "array", "items" : "boolean"}}, 
+     {"name": "type_array_string", "type" : {"type" : "array", "items" : "string"}},
+     {"name": "type_array_boolean", "type" : {"type" : "array", "items" : "boolean"}},
      {"name": "type_nullable_array", "type": ["null", {"type":"array", "items":"string"}], "default":null},
      {"name": "type_enum", "type": {"type": "enum", "name": "Colors", "symbols" : ["RED", "GREEN", "BLUE"]}},
      {"name": "type_map", "type": {"type": "map", "values": "long"}},