You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/02 13:35:35 UTC
[45/51] [abbrv] [partial] flink git commit: [FLINK-4676] [connectors]
Merge batch and streaming connectors into common Maven module.
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java b/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java
new file mode 100644
index 0000000..73067c1
--- /dev/null
+++ b/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.java.io;
+
+import java.io.IOException;
+
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.file.SeekableInput;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.flink.api.common.io.CheckpointableInputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.flink.api.avro.FSDataInputStreamWrapper;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.InstantiationUtil;
+
+/**
+ * Provides a {@link FileInputFormat} for Avro records.
+ *
+ * @param <E>
+ * the type of the result Avro record. If you specify
+ * {@link GenericRecord} then the result will be returned as a
+ * {@link GenericRecord}, so you do not have to know the schema ahead
+ * of time.
+ */
+public class AvroInputFormat<E> extends FileInputFormat<E> implements ResultTypeQueryable<E>,
+ CheckpointableInputFormat<FileInputSplit, Tuple2<Long, Long>> {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(AvroInputFormat.class);
+
+ private final Class<E> avroValueType;
+
+ private boolean reuseAvroValue = true;
+
+ private transient DataFileReader<E> dataFileReader;
+
+ private transient long end;
+
+ private transient long recordsReadSinceLastSync;
+
+ private long lastSync = -1l;
+
+ public AvroInputFormat(Path filePath, Class<E> type) {
+ super(filePath);
+ this.avroValueType = type;
+ }
+
+ /**
+ * Sets the flag whether to reuse the Avro value instance for all records.
+ * By default, the input format reuses the Avro value.
+ *
+ * @param reuseAvroValue True, if the input format should reuse the Avro value instance, false otherwise.
+ */
+ public void setReuseAvroValue(boolean reuseAvroValue) {
+ this.reuseAvroValue = reuseAvroValue;
+ }
+
+ /**
+ * If set, the InputFormat will only read entire files.
+ */
+ public void setUnsplittable(boolean unsplittable) {
+ this.unsplittable = unsplittable;
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Typing
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public TypeInformation<E> getProducedType() {
+ return TypeExtractor.getForClass(this.avroValueType);
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Input Format Methods
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public void open(FileInputSplit split) throws IOException {
+ super.open(split);
+ dataFileReader = initReader(split);
+ dataFileReader.sync(split.getStart());
+ lastSync = dataFileReader.previousSync();
+ }
+
+ private DataFileReader<E> initReader(FileInputSplit split) throws IOException {
+ DatumReader<E> datumReader;
+
+ if (org.apache.avro.generic.GenericRecord.class == avroValueType) {
+ datumReader = new GenericDatumReader<E>();
+ } else {
+ datumReader = org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroValueType)
+ ? new SpecificDatumReader<E>(avroValueType) : new ReflectDatumReader<E>(avroValueType);
+ }
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Opening split {}", split);
+ }
+
+ SeekableInput in = new FSDataInputStreamWrapper(stream, split.getPath().getFileSystem().getFileStatus(split.getPath()).getLen());
+ DataFileReader<E> dataFileReader = (DataFileReader) DataFileReader.openReader(in, datumReader);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Loaded SCHEMA: {}", dataFileReader.getSchema());
+ }
+
+ end = split.getStart() + split.getLength();
+ recordsReadSinceLastSync = 0;
+ return dataFileReader;
+ }
+
+ @Override
+ public boolean reachedEnd() throws IOException {
+ return !dataFileReader.hasNext() || dataFileReader.pastSync(end);
+ }
+
+ public long getRecordsReadFromBlock() {
+ return this.recordsReadSinceLastSync;
+ }
+
+ @Override
+ public E nextRecord(E reuseValue) throws IOException {
+ if (reachedEnd()) {
+ return null;
+ }
+
+ // if we start a new block, then register the event, and
+ // restart the counter.
+ if(dataFileReader.previousSync() != lastSync) {
+ lastSync = dataFileReader.previousSync();
+ recordsReadSinceLastSync = 0;
+ }
+ recordsReadSinceLastSync++;
+
+ if (reuseAvroValue) {
+ return dataFileReader.next(reuseValue);
+ } else {
+ if (GenericRecord.class == avroValueType) {
+ return dataFileReader.next();
+ } else {
+ return dataFileReader.next(InstantiationUtil.instantiate(avroValueType, Object.class));
+ }
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Checkpointing
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public Tuple2<Long, Long> getCurrentState() throws IOException {
+ return new Tuple2<>(this.lastSync, this.recordsReadSinceLastSync);
+ }
+
+ @Override
+ public void reopen(FileInputSplit split, Tuple2<Long, Long> state) throws IOException {
+ Preconditions.checkNotNull(split, "reopen() cannot be called on a null split.");
+ Preconditions.checkNotNull(state, "reopen() cannot be called with a null initial state.");
+
+ try {
+ this.open(split);
+ } finally {
+ if (state.f0 != -1) {
+ lastSync = state.f0;
+ recordsReadSinceLastSync = state.f1;
+ }
+ }
+
+ if (lastSync != -1) {
+ // open and read until the record we were before
+ // the checkpoint and discard the values
+ dataFileReader.seek(lastSync);
+ for(int i = 0; i < recordsReadSinceLastSync; i++) {
+ dataFileReader.next(null);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java b/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java
new file mode 100644
index 0000000..600d1e5
--- /dev/null
+++ b/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.io;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.CodecFactory;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.reflect.ReflectDatumWriter;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.flink.api.common.io.FileOutputFormat;
+import org.apache.flink.core.fs.Path;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+public class AvroOutputFormat<E> extends FileOutputFormat<E> implements Serializable {
+
+ /**
+ * Wrapper which encapsulates the supported codec and a related serialization byte.
+ */
+ public enum Codec {
+
+ NULL((byte)0, CodecFactory.nullCodec()),
+ SNAPPY((byte)1, CodecFactory.snappyCodec()),
+ BZIP2((byte)2, CodecFactory.bzip2Codec()),
+ DEFLATE((byte)3, CodecFactory.deflateCodec(CodecFactory.DEFAULT_DEFLATE_LEVEL)),
+ XZ((byte)4, CodecFactory.xzCodec(CodecFactory.DEFAULT_XZ_LEVEL));
+
+ private byte codecByte;
+
+ private CodecFactory codecFactory;
+
+ Codec(final byte codecByte, final CodecFactory codecFactory) {
+ this.codecByte = codecByte;
+ this.codecFactory = codecFactory;
+ }
+
+ private byte getCodecByte() {
+ return codecByte;
+ }
+
+ private CodecFactory getCodecFactory() {
+ return codecFactory;
+ }
+
+ private static Codec forCodecByte(byte codecByte) {
+ for (final Codec codec : Codec.values()) {
+ if (codec.getCodecByte() == codecByte) {
+ return codec;
+ }
+ }
+ throw new IllegalArgumentException("no codec for codecByte: " + codecByte);
+ }
+ }
+
+ private static final long serialVersionUID = 1L;
+
+ private final Class<E> avroValueType;
+
+ private transient Schema userDefinedSchema = null;
+
+ private transient Codec codec = null;
+
+ private transient DataFileWriter<E> dataFileWriter;
+
+ public AvroOutputFormat(Path filePath, Class<E> type) {
+ super(filePath);
+ this.avroValueType = type;
+ }
+
+ public AvroOutputFormat(Class<E> type) {
+ this.avroValueType = type;
+ }
+
+ @Override
+ protected String getDirectoryFileName(int taskNumber) {
+ return super.getDirectoryFileName(taskNumber) + ".avro";
+ }
+
+ public void setSchema(Schema schema) {
+ this.userDefinedSchema = schema;
+ }
+
+ /**
+ * Set avro codec for compression.
+ *
+ * @param codec avro codec.
+ */
+ public void setCodec(final Codec codec) {
+ this.codec = checkNotNull(codec, "codec can not be null");
+ }
+
+ @Override
+ public void writeRecord(E record) throws IOException {
+ dataFileWriter.append(record);
+ }
+
+ @Override
+ public void open(int taskNumber, int numTasks) throws IOException {
+ super.open(taskNumber, numTasks);
+
+ DatumWriter<E> datumWriter;
+ Schema schema;
+ if (org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroValueType)) {
+ datumWriter = new SpecificDatumWriter<E>(avroValueType);
+ try {
+ schema = ((org.apache.avro.specific.SpecificRecordBase)avroValueType.newInstance()).getSchema();
+ } catch (InstantiationException e) {
+ throw new RuntimeException(e.getMessage());
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException(e.getMessage());
+ }
+ } else {
+ datumWriter = new ReflectDatumWriter<E>(avroValueType);
+ schema = ReflectData.get().getSchema(avroValueType);
+ }
+ dataFileWriter = new DataFileWriter<E>(datumWriter);
+ if (codec != null) {
+ dataFileWriter.setCodec(codec.getCodecFactory());
+ }
+ if (userDefinedSchema == null) {
+ dataFileWriter.create(schema, stream);
+ } else {
+ dataFileWriter.create(userDefinedSchema, stream);
+ }
+ }
+
+ private void writeObject(java.io.ObjectOutputStream out) throws IOException {
+ out.defaultWriteObject();
+
+ if (codec != null) {
+ out.writeByte(codec.getCodecByte());
+ } else {
+ out.writeByte(-1);
+ }
+
+ if(userDefinedSchema != null) {
+ byte[] json = userDefinedSchema.toString().getBytes();
+ out.writeInt(json.length);
+ out.write(json);
+ } else {
+ out.writeInt(0);
+ }
+ }
+
+ private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException {
+ in.defaultReadObject();
+
+ byte codecByte = in.readByte();
+ if (codecByte >= 0) {
+ setCodec(Codec.forCodecByte(codecByte));
+ }
+
+ int length = in.readInt();
+ if(length != 0) {
+ byte[] json = new byte[length];
+ in.readFully(json);
+
+ Schema schema = new Schema.Parser().parse(new String(json));
+ setSchema(schema);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ dataFileWriter.flush();
+ dataFileWriter.close();
+ super.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-avro/src/test/assembly/test-assembly.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-avro/src/test/assembly/test-assembly.xml b/flink-connectors/flink-avro/src/test/assembly/test-assembly.xml
new file mode 100644
index 0000000..0f4561a
--- /dev/null
+++ b/flink-connectors/flink-avro/src/test/assembly/test-assembly.xml
@@ -0,0 +1,36 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<assembly>
+ <id>test-jar</id>
+ <formats>
+ <format>jar</format>
+ </formats>
+ <includeBaseDirectory>false</includeBaseDirectory>
+ <fileSets>
+ <fileSet>
+ <directory>${project.build.testOutputDirectory}</directory>
+ <outputDirectory>/</outputDirectory>
+ <!--modify/add include to match your package(s) -->
+ <includes>
+ <include>org/apache/flink/api/avro/testjar/**</include>
+ </includes>
+ </fileSet>
+ </fileSets>
+</assembly>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
new file mode 100644
index 0000000..1030ff8
--- /dev/null
+++ b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.avro;
+
+import java.io.File;
+
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.client.program.StandaloneClusterClient;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class AvroExternalJarProgramITCase {
+
+ private static final String JAR_FILE = "maven-test-jar.jar";
+
+ private static final String TEST_DATA_FILE = "/testdata.avro";
+
+ @Test
+ public void testExternalProgram() {
+
+ LocalFlinkMiniCluster testMiniCluster = null;
+
+ try {
+ Configuration config = new Configuration();
+ config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4);
+ testMiniCluster = new LocalFlinkMiniCluster(config, false);
+ testMiniCluster.start();
+
+ String jarFile = JAR_FILE;
+ String testData = getClass().getResource(TEST_DATA_FILE).toString();
+
+ PackagedProgram program = new PackagedProgram(new File(jarFile), new String[] { testData });
+
+
+ config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
+ config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, testMiniCluster.getLeaderRPCPort());
+
+ ClusterClient client = new StandaloneClusterClient(config);
+
+ client.setPrintStatusDuringExecution(false);
+ client.run(program, 4);
+
+ }
+ catch (Throwable t) {
+ System.err.println(t.getMessage());
+ t.printStackTrace();
+ Assert.fail("Error during the packaged program execution: " + t.getMessage());
+ }
+ finally {
+ if (testMiniCluster != null) {
+ try {
+ testMiniCluster.stop();
+ } catch (Throwable t) {
+ // ignore
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatITCase.java b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatITCase.java
new file mode 100644
index 0000000..3b01ccb
--- /dev/null
+++ b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatITCase.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.avro;
+
+import org.junit.Assert;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.flink.api.io.avro.example.User;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.io.AvroOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+@SuppressWarnings("serial")
+public class AvroOutputFormatITCase extends JavaProgramTestBase {
+
+ public static String outputPath1;
+
+ public static String outputPath2;
+
+ public static String inputPath;
+
+ public static String userData = "alice|1|blue\n" +
+ "bob|2|red\n" +
+ "john|3|yellow\n" +
+ "walt|4|black\n";
+
+ @Override
+ protected void preSubmit() throws Exception {
+ inputPath = createTempFile("user", userData);
+ outputPath1 = getTempDirPath("avro_output1");
+ outputPath2 = getTempDirPath("avro_output2");
+ }
+
+
+ @Override
+ protected void testProgram() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple3<String, Integer, String>> input = env.readCsvFile(inputPath)
+ .fieldDelimiter("|")
+ .types(String.class, Integer.class, String.class);
+
+ //output the data with AvroOutputFormat for specific user type
+ DataSet<User> specificUser = input.map(new ConvertToUser());
+ AvroOutputFormat<User> avroOutputFormat = new AvroOutputFormat<User>(User.class);
+ avroOutputFormat.setCodec(AvroOutputFormat.Codec.SNAPPY); // FLINK-4771: use a codec
+ avroOutputFormat.setSchema(User.SCHEMA$); //FLINK-3304: Ensure the OF is properly serializing the schema
+ specificUser.write(avroOutputFormat, outputPath1);
+
+ //output the data with AvroOutputFormat for reflect user type
+ DataSet<ReflectiveUser> reflectiveUser = specificUser.map(new ConvertToReflective());
+ reflectiveUser.write(new AvroOutputFormat<ReflectiveUser>(ReflectiveUser.class), outputPath2);
+
+ env.execute();
+ }
+
+ @Override
+ protected void postSubmit() throws Exception {
+ //compare result for specific user type
+ File [] output1;
+ File file1 = asFile(outputPath1);
+ if (file1.isDirectory()) {
+ output1 = file1.listFiles();
+ // check for avro ext in dir.
+ for (File avroOutput : output1) {
+ Assert.assertTrue("Expect extension '.avro'", avroOutput.toString().endsWith(".avro"));
+ }
+ } else {
+ output1 = new File[] {file1};
+ }
+ List<String> result1 = new ArrayList<String>();
+ DatumReader<User> userDatumReader1 = new SpecificDatumReader<User>(User.class);
+ for (File avroOutput : output1) {
+
+ DataFileReader<User> dataFileReader1 = new DataFileReader<User>(avroOutput, userDatumReader1);
+ while (dataFileReader1.hasNext()) {
+ User user = dataFileReader1.next();
+ result1.add(user.getName() + "|" + user.getFavoriteNumber() + "|" + user.getFavoriteColor());
+ }
+ }
+ for (String expectedResult : userData.split("\n")) {
+ Assert.assertTrue("expected user " + expectedResult + " not found.", result1.contains(expectedResult));
+ }
+
+ //compare result for reflect user type
+ File [] output2;
+ File file2 = asFile(outputPath2);
+ if (file2.isDirectory()) {
+ output2 = file2.listFiles();
+ } else {
+ output2 = new File[] {file2};
+ }
+ List<String> result2 = new ArrayList<String>();
+ DatumReader<ReflectiveUser> userDatumReader2 = new ReflectDatumReader<ReflectiveUser>(ReflectiveUser.class);
+ for (File avroOutput : output2) {
+ DataFileReader<ReflectiveUser> dataFileReader2 = new DataFileReader<ReflectiveUser>(avroOutput, userDatumReader2);
+ while (dataFileReader2.hasNext()) {
+ ReflectiveUser user = dataFileReader2.next();
+ result2.add(user.getName() + "|" + user.getFavoriteNumber() + "|" + user.getFavoriteColor());
+ }
+ }
+ for (String expectedResult : userData.split("\n")) {
+ Assert.assertTrue("expected user " + expectedResult + " not found.", result2.contains(expectedResult));
+ }
+
+
+ }
+
+
+ public final static class ConvertToUser extends RichMapFunction<Tuple3<String, Integer, String>, User> {
+
+ @Override
+ public User map(Tuple3<String, Integer, String> value) throws Exception {
+ return new User(value.f0, value.f1, value.f2);
+ }
+ }
+
+ public final static class ConvertToReflective extends RichMapFunction<User, ReflectiveUser> {
+
+ @Override
+ public ReflectiveUser map(User value) throws Exception {
+ return new ReflectiveUser(value.getName().toString(), value.getFavoriteNumber(), value.getFavoriteColor().toString());
+ }
+ }
+
+
+ public static class ReflectiveUser {
+ private String name;
+ private int favoriteNumber;
+ private String favoriteColor;
+
+ public ReflectiveUser() {}
+
+ public ReflectiveUser(String name, int favoriteNumber, String favoriteColor) {
+ this.name = name;
+ this.favoriteNumber = favoriteNumber;
+ this.favoriteColor = favoriteColor;
+ }
+
+ public String getName() {
+ return this.name;
+ }
+ public String getFavoriteColor() {
+ return this.favoriteColor;
+ }
+ public int getFavoriteNumber() {
+ return this.favoriteNumber;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java
new file mode 100644
index 0000000..c39db15
--- /dev/null
+++ b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java
@@ -0,0 +1,528 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.avro;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.reflect.ReflectDatumWriter;
+import org.apache.flink.api.io.avro.generated.Address;
+import org.apache.flink.api.io.avro.generated.Colors;
+import org.apache.flink.api.io.avro.generated.Fixed16;
+import org.apache.flink.api.io.avro.generated.User;
+import org.apache.flink.util.StringUtils;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+/**
+ * Tests the {@link DataOutputEncoder} and {@link DataInputDecoder} classes for Avro serialization.
+ */
+public class EncoderDecoderTest {
+ @Test
+ public void testComplexStringsDirecty() {
+ try {
+ Random rnd = new Random(349712539451944123L);
+
+ for (int i = 0; i < 10; i++) {
+ String testString = StringUtils.getRandomString(rnd, 10, 100);
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream(512);
+ {
+ DataOutputStream dataOut = new DataOutputStream(baos);
+ DataOutputEncoder encoder = new DataOutputEncoder();
+ encoder.setOut(dataOut);
+
+ encoder.writeString(testString);
+ dataOut.flush();
+ dataOut.close();
+ }
+
+ byte[] data = baos.toByteArray();
+
+ // deserialize
+ {
+ ByteArrayInputStream bais = new ByteArrayInputStream(data);
+ DataInputStream dataIn = new DataInputStream(bais);
+ DataInputDecoder decoder = new DataInputDecoder();
+ decoder.setIn(dataIn);
+
+ String deserialized = decoder.readString();
+
+ assertEquals(testString, deserialized);
+ }
+ }
+ }
+ catch (Exception e) {
+ System.err.println(e.getMessage());
+ e.printStackTrace();
+ fail("Test failed due to an exception: " + e.getMessage());
+ }
+ }
+
+ @Test
+ public void testPrimitiveTypes() {
+
+ testObjectSerialization(new Boolean(true));
+ testObjectSerialization(new Boolean(false));
+
+ testObjectSerialization(Byte.valueOf((byte) 0));
+ testObjectSerialization(Byte.valueOf((byte) 1));
+ testObjectSerialization(Byte.valueOf((byte) -1));
+ testObjectSerialization(Byte.valueOf(Byte.MIN_VALUE));
+ testObjectSerialization(Byte.valueOf(Byte.MAX_VALUE));
+
+ testObjectSerialization(Short.valueOf((short) 0));
+ testObjectSerialization(Short.valueOf((short) 1));
+ testObjectSerialization(Short.valueOf((short) -1));
+ testObjectSerialization(Short.valueOf(Short.MIN_VALUE));
+ testObjectSerialization(Short.valueOf(Short.MAX_VALUE));
+
+ testObjectSerialization(Integer.valueOf(0));
+ testObjectSerialization(Integer.valueOf(1));
+ testObjectSerialization(Integer.valueOf(-1));
+ testObjectSerialization(Integer.valueOf(Integer.MIN_VALUE));
+ testObjectSerialization(Integer.valueOf(Integer.MAX_VALUE));
+
+ testObjectSerialization(Long.valueOf(0));
+ testObjectSerialization(Long.valueOf(1));
+ testObjectSerialization(Long.valueOf(-1));
+ testObjectSerialization(Long.valueOf(Long.MIN_VALUE));
+ testObjectSerialization(Long.valueOf(Long.MAX_VALUE));
+
+ testObjectSerialization(Float.valueOf(0));
+ testObjectSerialization(Float.valueOf(1));
+ testObjectSerialization(Float.valueOf(-1));
+ testObjectSerialization(Float.valueOf((float)Math.E));
+ testObjectSerialization(Float.valueOf((float)Math.PI));
+ testObjectSerialization(Float.valueOf(Float.MIN_VALUE));
+ testObjectSerialization(Float.valueOf(Float.MAX_VALUE));
+ testObjectSerialization(Float.valueOf(Float.MIN_NORMAL));
+ testObjectSerialization(Float.valueOf(Float.NaN));
+ testObjectSerialization(Float.valueOf(Float.NEGATIVE_INFINITY));
+ testObjectSerialization(Float.valueOf(Float.POSITIVE_INFINITY));
+
+ testObjectSerialization(Double.valueOf(0));
+ testObjectSerialization(Double.valueOf(1));
+ testObjectSerialization(Double.valueOf(-1));
+ testObjectSerialization(Double.valueOf(Math.E));
+ testObjectSerialization(Double.valueOf(Math.PI));
+ testObjectSerialization(Double.valueOf(Double.MIN_VALUE));
+ testObjectSerialization(Double.valueOf(Double.MAX_VALUE));
+ testObjectSerialization(Double.valueOf(Double.MIN_NORMAL));
+ testObjectSerialization(Double.valueOf(Double.NaN));
+ testObjectSerialization(Double.valueOf(Double.NEGATIVE_INFINITY));
+ testObjectSerialization(Double.valueOf(Double.POSITIVE_INFINITY));
+
+ testObjectSerialization("");
+ testObjectSerialization("abcdefg");
+ testObjectSerialization("ab\u1535\u0155xyz\u706F");
+
+ testObjectSerialization(new SimpleTypes(3637, 54876486548L, (byte) 65, "We're out looking for astronauts", (short) 0x2387, 2.65767523));
+ testObjectSerialization(new SimpleTypes(705608724, -1L, (byte) -65, "Serve me the sky with a big slice of lemon", (short) Byte.MIN_VALUE, 0.0000001));
+ }
+
+ @Test
+ public void testArrayTypes() {
+ {
+ int[] array = new int[] {1, 2, 3, 4, 5};
+ testObjectSerialization(array);
+ }
+ {
+ long[] array = new long[] {1, 2, 3, 4, 5};
+ testObjectSerialization(array);
+ }
+ {
+ float[] array = new float[] {1, 2, 3, 4, 5};
+ testObjectSerialization(array);
+ }
+ {
+ double[] array = new double[] {1, 2, 3, 4, 5};
+ testObjectSerialization(array);
+ }
+ {
+ String[] array = new String[] {"Oh", "my", "what", "do", "we", "have", "here", "?"};
+ testObjectSerialization(array);
+ }
+ }
+
+ @Test
+ public void testEmptyArray() {
+ {
+ int[] array = new int[0];
+ testObjectSerialization(array);
+ }
+ {
+ long[] array = new long[0];
+ testObjectSerialization(array);
+ }
+ {
+ float[] array = new float[0];
+ testObjectSerialization(array);
+ }
+ {
+ double[] array = new double[0];
+ testObjectSerialization(array);
+ }
+ {
+ String[] array = new String[0];
+ testObjectSerialization(array);
+ }
+ }
+
+ @Test
+ public void testObjects() {
+ // simple object containing only primitives
+ {
+ testObjectSerialization(new Book(976243875L, "The Serialization Odysse", 42));
+ }
+
+ // object with collection
+ {
+ ArrayList<String> list = new ArrayList<String>();
+ list.add("A");
+ list.add("B");
+ list.add("C");
+ list.add("D");
+ list.add("E");
+
+ testObjectSerialization(new BookAuthor(976243875L, list, "Arno Nym"));
+ }
+
+ // object with empty collection
+ {
+ ArrayList<String> list = new ArrayList<String>();
+ testObjectSerialization(new BookAuthor(987654321L, list, "The Saurus"));
+ }
+ }
+
+ @Test
+ public void testNestedObjectsWithCollections() {
+ testObjectSerialization(new ComplexNestedObject2(true));
+ }
+
+ @Test
+ public void testGeneratedObjectWithNullableFields() {
+ List<CharSequence> strings = Arrays.asList(new CharSequence[] { "These", "strings", "should", "be", "recognizable", "as", "a", "meaningful", "sequence" });
+ List<Boolean> bools = Arrays.asList(true, true, false, false, true, false, true, true);
+ Map<CharSequence, Long> map = new HashMap<CharSequence, Long>();
+ map.put("1", 1L);
+ map.put("2", 2L);
+ map.put("3", 3L);
+
+ byte[] b = new byte[16];
+ new Random().nextBytes(b);
+ Fixed16 f = new Fixed16(b);
+ Address addr = new Address(new Integer(239), "6th Main", "Bangalore",
+ "Karnataka", "560075");
+ User user = new User("Freudenreich", 1337, "macintosh gray",
+ 1234567890L, 3.1415926, null, true, strings, bools, null,
+ Colors.GREEN, map, f, new Boolean(true), addr);
+
+ testObjectSerialization(user);
+ }
+
+ @Test
+ public void testVarLenCountEncoding() {
+ try {
+ long[] values = new long[] { 0, 1, 2, 3, 4, 0, 574, 45236, 0, 234623462, 23462462346L, 0, 9734028767869761L, 0x7fffffffffffffffL};
+
+ // write
+ ByteArrayOutputStream baos = new ByteArrayOutputStream(512);
+ {
+ DataOutputStream dataOut = new DataOutputStream(baos);
+
+ for (long val : values) {
+ DataOutputEncoder.writeVarLongCount(dataOut, val);
+ }
+
+ dataOut.flush();
+ dataOut.close();
+ }
+
+ // read
+ {
+ ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
+ DataInputStream dataIn = new DataInputStream(bais);
+
+ for (long val : values) {
+ long read = DataInputDecoder.readVarLongCount(dataIn);
+ assertEquals("Wrong var-len encoded value read.", val, read);
+ }
+ }
+ }
+ catch (Exception e) {
+ System.err.println(e.getMessage());
+ e.printStackTrace();
+ fail("Test failed due to an exception: " + e.getMessage());
+ }
+ }
+
+ private static <X> void testObjectSerialization(X obj) {
+
+ try {
+
+ // serialize
+ ByteArrayOutputStream baos = new ByteArrayOutputStream(512);
+ {
+ DataOutputStream dataOut = new DataOutputStream(baos);
+ DataOutputEncoder encoder = new DataOutputEncoder();
+ encoder.setOut(dataOut);
+
+ @SuppressWarnings("unchecked")
+ Class<X> clazz = (Class<X>) obj.getClass();
+ ReflectDatumWriter<X> writer = new ReflectDatumWriter<X>(clazz);
+
+ writer.write(obj, encoder);
+ dataOut.flush();
+ dataOut.close();
+ }
+
+ byte[] data = baos.toByteArray();
+ X result = null;
+
+ // deserialize
+ {
+ ByteArrayInputStream bais = new ByteArrayInputStream(data);
+ DataInputStream dataIn = new DataInputStream(bais);
+ DataInputDecoder decoder = new DataInputDecoder();
+ decoder.setIn(dataIn);
+
+ @SuppressWarnings("unchecked")
+ Class<X> clazz = (Class<X>) obj.getClass();
+ ReflectDatumReader<X> reader = new ReflectDatumReader<X>(clazz);
+
+ // create a reuse object if possible, otherwise we have no reuse object
+ X reuse = null;
+ try {
+ @SuppressWarnings("unchecked")
+ X test = (X) obj.getClass().newInstance();
+ reuse = test;
+ } catch (Throwable t) {}
+
+ result = reader.read(reuse, decoder);
+ }
+
+ // check
+ final String message = "Deserialized object is not the same as the original";
+
+ if (obj.getClass().isArray()) {
+ Class<?> clazz = obj.getClass();
+ if (clazz == byte[].class) {
+ assertArrayEquals(message, (byte[]) obj, (byte[]) result);
+ }
+ else if (clazz == short[].class) {
+ assertArrayEquals(message, (short[]) obj, (short[]) result);
+ }
+ else if (clazz == int[].class) {
+ assertArrayEquals(message, (int[]) obj, (int[]) result);
+ }
+ else if (clazz == long[].class) {
+ assertArrayEquals(message, (long[]) obj, (long[]) result);
+ }
+ else if (clazz == char[].class) {
+ assertArrayEquals(message, (char[]) obj, (char[]) result);
+ }
+ else if (clazz == float[].class) {
+ assertArrayEquals(message, (float[]) obj, (float[]) result, 0.0f);
+ }
+ else if (clazz == double[].class) {
+ assertArrayEquals(message, (double[]) obj, (double[]) result, 0.0);
+ } else {
+ assertArrayEquals(message, (Object[]) obj, (Object[]) result);
+ }
+ } else {
+ assertEquals(message, obj, result);
+ }
+ }
+ catch (Exception e) {
+ System.err.println(e.getMessage());
+ e.printStackTrace();
+ fail("Test failed due to an exception: " + e.getMessage());
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Test Objects
+ // --------------------------------------------------------------------------------------------
+
+
+ public static final class SimpleTypes {
+
+ private final int iVal;
+ private final long lVal;
+ private final byte bVal;
+ private final String sVal;
+ private final short rVal;
+ private final double dVal;
+
+
+ public SimpleTypes() {
+ this(0, 0, (byte) 0, "", (short) 0, 0);
+ }
+
+ public SimpleTypes(int iVal, long lVal, byte bVal, String sVal, short rVal, double dVal) {
+ this.iVal = iVal;
+ this.lVal = lVal;
+ this.bVal = bVal;
+ this.sVal = sVal;
+ this.rVal = rVal;
+ this.dVal = dVal;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj.getClass() == SimpleTypes.class) {
+ SimpleTypes other = (SimpleTypes) obj;
+
+ return other.iVal == this.iVal &&
+ other.lVal == this.lVal &&
+ other.bVal == this.bVal &&
+ other.sVal.equals(this.sVal) &&
+ other.rVal == this.rVal &&
+ other.dVal == this.dVal;
+
+ } else {
+ return false;
+ }
+ }
+ }
+
+ public static class ComplexNestedObject1 {
+
+ private double doubleValue;
+
+ private List<String> stringList;
+
+ public ComplexNestedObject1() {}
+
+ public ComplexNestedObject1(int offInit) {
+ this.doubleValue = 6293485.6723 + offInit;
+
+ this.stringList = new ArrayList<String>();
+ this.stringList.add("A" + offInit);
+ this.stringList.add("somewhat" + offInit);
+ this.stringList.add("random" + offInit);
+ this.stringList.add("collection" + offInit);
+ this.stringList.add("of" + offInit);
+ this.stringList.add("strings" + offInit);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj.getClass() == ComplexNestedObject1.class) {
+ ComplexNestedObject1 other = (ComplexNestedObject1) obj;
+ return other.doubleValue == this.doubleValue && this.stringList.equals(other.stringList);
+ } else {
+ return false;
+ }
+ }
+ }
+
+ public static class ComplexNestedObject2 {
+
+ private long longValue;
+
+ private Map<String, ComplexNestedObject1> theMap;
+
+ public ComplexNestedObject2() {}
+
+ public ComplexNestedObject2(boolean init) {
+ this.longValue = 46547;
+
+ this.theMap = new HashMap<String, ComplexNestedObject1>();
+ this.theMap.put("36354L", new ComplexNestedObject1(43546543));
+ this.theMap.put("785611L", new ComplexNestedObject1(45784568));
+ this.theMap.put("43L", new ComplexNestedObject1(9876543));
+ this.theMap.put("-45687L", new ComplexNestedObject1(7897615));
+ this.theMap.put("1919876876896L", new ComplexNestedObject1(27154));
+ this.theMap.put("-868468468L", new ComplexNestedObject1(546435));
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj.getClass() == ComplexNestedObject2.class) {
+ ComplexNestedObject2 other = (ComplexNestedObject2) obj;
+ return other.longValue == this.longValue && this.theMap.equals(other.theMap);
+ } else {
+ return false;
+ }
+ }
+ }
+
+ public static class Book {
+
+ private long bookId;
+ private String title;
+ private long authorId;
+
+ public Book() {}
+
+ public Book(long bookId, String title, long authorId) {
+ this.bookId = bookId;
+ this.title = title;
+ this.authorId = authorId;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj.getClass() == Book.class) {
+ Book other = (Book) obj;
+ return other.bookId == this.bookId && other.authorId == this.authorId && this.title.equals(other.title);
+ } else {
+ return false;
+ }
+ }
+ }
+
+ public static class BookAuthor {
+
+ private long authorId;
+ private List<String> bookTitles;
+ private String authorName;
+
+ public BookAuthor() {}
+
+ public BookAuthor(long authorId, List<String> bookTitles, String authorName) {
+ this.authorId = authorId;
+ this.bookTitles = bookTitles;
+ this.authorName = authorName;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj.getClass() == BookAuthor.class) {
+ BookAuthor other = (BookAuthor) obj;
+ return other.authorName.equals(this.authorName) && other.authorId == this.authorId &&
+ other.bookTitles.equals(this.bookTitles);
+ } else {
+ return false;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java
new file mode 100644
index 0000000..1174786
--- /dev/null
+++ b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.avro.testjar;
+
+// ================================================================================================
+// This file defines the classes for the AvroExternalJarProgramITCase.
+// The program is exported into src/test/resources/AvroTestProgram.jar.
+//
+// THIS FILE MUST STAY FULLY COMMENTED SUCH THAT THE HERE DEFINED CLASSES ARE NOT COMPILED
+// AND ADDED TO THE test-classes DIRECTORY. OTHERWISE, THE EXTERNAL CLASS LOADING WILL
+// NOT BE COVERED BY THIS TEST.
+// ================================================================================================
+
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.reflect.ReflectDatumWriter;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.functions.RichReduceFunction;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.AvroInputFormat;
+import org.apache.flink.core.fs.Path;
+
+public class AvroExternalJarProgram {
+
+ public static final class Color {
+
+ private String name;
+ private double saturation;
+
+ public Color() {
+ name = "";
+ saturation = 1.0;
+ }
+
+ public Color(String name, double saturation) {
+ this.name = name;
+ this.saturation = saturation;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public double getSaturation() {
+ return saturation;
+ }
+
+ public void setSaturation(double saturation) {
+ this.saturation = saturation;
+ }
+
+ @Override
+ public String toString() {
+ return name + '(' + saturation + ')';
+ }
+ }
+
+ public static final class MyUser {
+
+ private String name;
+ private List<Color> colors;
+
+ public MyUser() {
+ name = "unknown";
+ colors = new ArrayList<Color>();
+ }
+
+ public MyUser(String name, List<Color> colors) {
+ this.name = name;
+ this.colors = colors;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public List<Color> getColors() {
+ return colors;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public void setColors(List<Color> colors) {
+ this.colors = colors;
+ }
+
+ @Override
+ public String toString() {
+ return name + " : " + colors;
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ // --------------------------------------------------------------------------------------------
+
+ public static final class NameExtractor extends RichMapFunction<MyUser, Tuple2<String, MyUser>> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Tuple2<String, MyUser> map(MyUser u) {
+ String namePrefix = u.getName().substring(0, 1);
+ return new Tuple2<String, MyUser>(namePrefix, u);
+ }
+ }
+
+ public static final class NameGrouper extends RichReduceFunction<Tuple2<String, MyUser>> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Tuple2<String, MyUser> reduce(Tuple2<String, MyUser> val1, Tuple2<String, MyUser> val2) {
+ return val1;
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Test Data
+ // --------------------------------------------------------------------------------------------
+
+ public static final class Generator {
+
+ private final Random rnd = new Random(2389756789345689276L);
+
+ public MyUser nextUser() {
+ return randomUser();
+ }
+
+ private MyUser randomUser() {
+
+ int numColors = rnd.nextInt(5);
+ ArrayList<Color> colors = new ArrayList<Color>(numColors);
+ for (int i = 0; i < numColors; i++) {
+ colors.add(new Color(randomString(), rnd.nextDouble()));
+ }
+
+ return new MyUser(randomString(), colors);
+ }
+
+ private String randomString() {
+ char[] c = new char[this.rnd.nextInt(20) + 5];
+
+ for (int i = 0; i < c.length; i++) {
+ c[i] = (char) (this.rnd.nextInt(150) + 40);
+ }
+
+ return new String(c);
+ }
+ }
+
+ public static void writeTestData(File testFile, int numRecords) throws IOException {
+
+ DatumWriter<MyUser> userDatumWriter = new ReflectDatumWriter<MyUser>(MyUser.class);
+ DataFileWriter<MyUser> dataFileWriter = new DataFileWriter<MyUser>(userDatumWriter);
+
+ dataFileWriter.create(ReflectData.get().getSchema(MyUser.class), testFile);
+
+
+ Generator generator = new Generator();
+
+ for (int i = 0; i < numRecords; i++) {
+ MyUser user = generator.nextUser();
+ dataFileWriter.append(user);
+ }
+
+ dataFileWriter.close();
+ }
+
+// public static void main(String[] args) throws Exception {
+// String testDataFile = new File("src/test/resources/testdata.avro").getAbsolutePath();
+// writeTestData(new File(testDataFile), 50);
+// }
+
+ public static void main(String[] args) throws Exception {
+ String inputPath = args[0];
+
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<MyUser> input = env.createInput(new AvroInputFormat<MyUser>(new Path(inputPath), MyUser.class));
+
+ DataSet<Tuple2<String, MyUser>> result = input.map(new NameExtractor()).groupBy(0).reduce(new NameGrouper());
+
+ result.output(new DiscardingOutputFormat<Tuple2<String,MyUser>>());
+ env.execute();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java
new file mode 100644
index 0000000..f33f433
--- /dev/null
+++ b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.io.avro;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.io.avro.generated.User;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.io.AvroInputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.util.Collector;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+@RunWith(Parameterized.class)
+public class AvroPojoTest extends MultipleProgramsTestBase {
+ public AvroPojoTest(TestExecutionMode mode) {
+ super(mode);
+ }
+
+ private File inFile;
+ private String resultPath;
+ private String expected;
+
+ @Rule
+ public TemporaryFolder tempFolder = new TemporaryFolder();
+
+ @Before
+ public void before() throws Exception{
+ resultPath = tempFolder.newFile().toURI().toString();
+ inFile = tempFolder.newFile();
+ AvroRecordInputFormatTest.writeTestFile(inFile);
+ }
+
+ @After
+ public void after() throws Exception{
+ compareResultsByLinesInMemory(expected, resultPath);
+ }
+
+ @Test
+ public void testSimpleAvroRead() throws Exception {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ Path in = new Path(inFile.getAbsoluteFile().toURI());
+
+ AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class);
+ DataSet<User> usersDS = env.createInput(users)
+ // null map type because the order changes in different JVMs (hard to test)
+ .map(new MapFunction<User, User>() {
+ @Override
+ public User map(User value) throws Exception {
+ value.setTypeMap(null);
+ return value;
+ }
+ });
+
+ usersDS.writeAsText(resultPath);
+
+ env.execute("Simple Avro read job");
+
+
+ expected = "{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null, \"type_long_test\": null, \"type_double_test\": 123.45, \"type_null_test\": null, \"type_bool_test\": true, \"type_array_string\": [\"ELEMENT 1\", \"ELEMENT 2\"], \"type_array_boolean\": [true, false], \"type_nullable_array\": null, \"type_enum\": \"GREEN\", \"type_map\": null, \"type_fixed\": null, \"type_union\": null, \"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", \"city\": \"London\", \"state\": \"London\", \"zip\": \"NW1 6XE\"}}\n" +
+ "{\"name\": \"Charlie\", \"favorite_number\": null, \"favorite_color\": \"blue\", \"type_long_test\": 1337, \"type_double_test\": 1.337, \"type_null_test\": null, \"type_bool_test\": false, \"type_array_string\": [], \"type_array_boolean\": [], \"type_nullable_array\": null, \"type_enum\": \"RED\", \"type_map\": null, \"type_fixed\": null, \"type_union\": null, \"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", \"city\": \"London\", \"state\": \"London\", \"zip\": \"NW1 6XE\"}}\n";
+ }
+
+ @Test
+ public void testSerializeWithAvro() throws Exception {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.getConfig().enableForceAvro();
+ Path in = new Path(inFile.getAbsoluteFile().toURI());
+
+ AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class);
+ DataSet<User> usersDS = env.createInput(users)
+ // null map type because the order changes in different JVMs (hard to test)
+ .map(new MapFunction<User, User>() {
+ @Override
+ public User map(User value) throws Exception {
+ Map<CharSequence, Long> ab = new HashMap<CharSequence, Long>(1);
+ ab.put("hehe", 12L);
+ value.setTypeMap(ab);
+ return value;
+ }
+ });
+
+ usersDS.writeAsText(resultPath);
+
+ env.execute("Simple Avro read job");
+
+
+ expected = "{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null, \"type_long_test\": null, \"type_double_test\": 123.45, \"type_null_test\": null, \"type_bool_test\": true, \"type_array_string\": [\"ELEMENT 1\", \"ELEMENT 2\"], \"type_array_boolean\": [true, false], \"type_nullable_array\": null, \"type_enum\": \"GREEN\", \"type_map\": {\"hehe\": 12}, \"type_fixed\": null, \"type_union\": null, \"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", \"city\": \"London\", \"state\": \"London\", \"zip\": \"NW1 6XE\"}}\n" +
+ "{\"name\": \"Charlie\", \"favorite_number\": null, \"favorite_color\": \"blue\", \"type_long_test\": 1337, \"type_double_test\": 1.337, \"type_null_test\": null, \"type_bool_test\": false, \"type_array_string\": [], \"type_array_boolean\": [], \"type_nullable_array\": null, \"type_enum\": \"RED\", \"type_map\": {\"hehe\": 12}, \"type_fixed\": null, \"type_union\": null, \"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", \"city\": \"London\", \"state\": \"London\", \"zip\": \"NW1 6XE\"}}\n";
+
+ }
+
+ @Test
+ public void testKeySelection() throws Exception {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.getConfig().enableObjectReuse();
+ Path in = new Path(inFile.getAbsoluteFile().toURI());
+
+ AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class);
+ DataSet<User> usersDS = env.createInput(users);
+
+ DataSet<Tuple2<String, Integer>> res = usersDS.groupBy("name").reduceGroup(new GroupReduceFunction<User, Tuple2<String, Integer>>() {
+ @Override
+ public void reduce(Iterable<User> values, Collector<Tuple2<String, Integer>> out) throws Exception {
+ for (User u : values) {
+ out.collect(new Tuple2<String, Integer>(u.getName().toString(), 1));
+ }
+ }
+ });
+ res.writeAsText(resultPath);
+ env.execute("Avro Key selection");
+
+
+ expected = "(Alyssa,1)\n(Charlie,1)\n";
+ }
+
+ @Test
+ public void testWithAvroGenericSer() throws Exception {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.getConfig().enableForceAvro();
+ Path in = new Path(inFile.getAbsoluteFile().toURI());
+
+ AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class);
+ DataSet<User> usersDS = env.createInput(users);
+
+ DataSet<Tuple2<String, Integer>> res = usersDS.groupBy(new KeySelector<User, String>() {
+ @Override
+ public String getKey(User value) throws Exception {
+ return String.valueOf(value.getName());
+ }
+ }).reduceGroup(new GroupReduceFunction<User, Tuple2<String, Integer>>() {
+ @Override
+ public void reduce(Iterable<User> values, Collector<Tuple2<String, Integer>> out) throws Exception {
+ for(User u : values) {
+ out.collect(new Tuple2<String, Integer>(u.getName().toString(), 1));
+ }
+ }
+ });
+
+ res.writeAsText(resultPath);
+ env.execute("Avro Key selection");
+
+
+ expected = "(Charlie,1)\n(Alyssa,1)\n";
+ }
+
+ @Test
+ public void testWithKryoGenericSer() throws Exception {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.getConfig().enableForceKryo();
+ Path in = new Path(inFile.getAbsoluteFile().toURI());
+
+ AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class);
+ DataSet<User> usersDS = env.createInput(users);
+
+ DataSet<Tuple2<String, Integer>> res = usersDS.groupBy(new KeySelector<User, String>() {
+ @Override
+ public String getKey(User value) throws Exception {
+ return String.valueOf(value.getName());
+ }
+ }).reduceGroup(new GroupReduceFunction<User, Tuple2<String, Integer>>() {
+ @Override
+ public void reduce(Iterable<User> values, Collector<Tuple2<String, Integer>> out) throws Exception {
+ for (User u : values) {
+ out.collect(new Tuple2<String, Integer>(u.getName().toString(), 1));
+ }
+ }
+ });
+
+ res.writeAsText(resultPath);
+ env.execute("Avro Key selection");
+
+
+ expected = "(Charlie,1)\n(Alyssa,1)\n";
+ }
+
+ /**
+ * Test some know fields for grouping on
+ */
+ @Test
+ public void testAllFields() throws Exception {
+ for(String fieldName : Arrays.asList("name", "type_enum", "type_double_test")) {
+ testField(fieldName);
+ }
+ }
+
+ private void testField(final String fieldName) throws Exception {
+ before();
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ Path in = new Path(inFile.getAbsoluteFile().toURI());
+
+ AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class);
+ DataSet<User> usersDS = env.createInput(users);
+
+ DataSet<Object> res = usersDS.groupBy(fieldName).reduceGroup(new GroupReduceFunction<User, Object>() {
+ @Override
+ public void reduce(Iterable<User> values, Collector<Object> out) throws Exception {
+ for(User u : values) {
+ out.collect(u.get(fieldName));
+ }
+ }
+ });
+ res.writeAsText(resultPath);
+ env.execute("Simple Avro read job");
+
+ // test if automatic registration of the Types worked
+ ExecutionConfig ec = env.getConfig();
+ Assert.assertTrue(ec.getRegisteredKryoTypes().contains(org.apache.flink.api.io.avro.generated.Fixed16.class));
+
+ if(fieldName.equals("name")) {
+ expected = "Alyssa\nCharlie";
+ } else if(fieldName.equals("type_enum")) {
+ expected = "GREEN\nRED\n";
+ } else if(fieldName.equals("type_double_test")) {
+ expected = "123.45\n1.337\n";
+ } else {
+ Assert.fail("Unknown field");
+ }
+
+ after();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java
new file mode 100644
index 0000000..91a9612
--- /dev/null
+++ b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java
@@ -0,0 +1,458 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.io.avro;
+
+import static org.junit.Assert.*;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.file.FileReader;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.avro.util.Utf8;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.io.avro.generated.Address;
+import org.apache.flink.api.io.avro.generated.Colors;
+import org.apache.flink.api.io.avro.generated.User;
+import org.apache.flink.api.java.io.AvroInputFormat;
+import org.apache.flink.api.java.typeutils.AvroTypeInfo;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test the avro input format.
+ * (The testcase is mostly the getting started tutorial of avro)
+ * http://avro.apache.org/docs/current/gettingstartedjava.html
+ */
+public class AvroRecordInputFormatTest {
+
+ public File testFile;
+
+ final static String TEST_NAME = "Alyssa";
+
+ final static String TEST_ARRAY_STRING_1 = "ELEMENT 1";
+ final static String TEST_ARRAY_STRING_2 = "ELEMENT 2";
+
+ final static boolean TEST_ARRAY_BOOLEAN_1 = true;
+ final static boolean TEST_ARRAY_BOOLEAN_2 = false;
+
+ final static Colors TEST_ENUM_COLOR = Colors.GREEN;
+
+ final static String TEST_MAP_KEY1 = "KEY 1";
+ final static long TEST_MAP_VALUE1 = 8546456L;
+ final static String TEST_MAP_KEY2 = "KEY 2";
+ final static long TEST_MAP_VALUE2 = 17554L;
+
+ final static int TEST_NUM = 239;
+ final static String TEST_STREET = "Baker Street";
+ final static String TEST_CITY = "London";
+ final static String TEST_STATE = "London";
+ final static String TEST_ZIP = "NW1 6XE";
+
+
+ private Schema userSchema = new User().getSchema();
+
+
+ public static void writeTestFile(File testFile) throws IOException {
+ ArrayList<CharSequence> stringArray = new ArrayList<CharSequence>();
+ stringArray.add(TEST_ARRAY_STRING_1);
+ stringArray.add(TEST_ARRAY_STRING_2);
+
+ ArrayList<Boolean> booleanArray = new ArrayList<Boolean>();
+ booleanArray.add(TEST_ARRAY_BOOLEAN_1);
+ booleanArray.add(TEST_ARRAY_BOOLEAN_2);
+
+ HashMap<CharSequence, Long> longMap = new HashMap<CharSequence, Long>();
+ longMap.put(TEST_MAP_KEY1, TEST_MAP_VALUE1);
+ longMap.put(TEST_MAP_KEY2, TEST_MAP_VALUE2);
+
+ Address addr = new Address();
+ addr.setNum(TEST_NUM);
+ addr.setStreet(TEST_STREET);
+ addr.setCity(TEST_CITY);
+ addr.setState(TEST_STATE);
+ addr.setZip(TEST_ZIP);
+
+
+ User user1 = new User();
+
+ user1.setName(TEST_NAME);
+ user1.setFavoriteNumber(256);
+ user1.setTypeDoubleTest(123.45d);
+ user1.setTypeBoolTest(true);
+ user1.setTypeArrayString(stringArray);
+ user1.setTypeArrayBoolean(booleanArray);
+ user1.setTypeEnum(TEST_ENUM_COLOR);
+ user1.setTypeMap(longMap);
+ user1.setTypeNested(addr);
+
+ // Construct via builder
+ User user2 = User.newBuilder()
+ .setName("Charlie")
+ .setFavoriteColor("blue")
+ .setFavoriteNumber(null)
+ .setTypeBoolTest(false)
+ .setTypeDoubleTest(1.337d)
+ .setTypeNullTest(null)
+ .setTypeLongTest(1337L)
+ .setTypeArrayString(new ArrayList<CharSequence>())
+ .setTypeArrayBoolean(new ArrayList<Boolean>())
+ .setTypeNullableArray(null)
+ .setTypeEnum(Colors.RED)
+ .setTypeMap(new HashMap<CharSequence, Long>())
+ .setTypeFixed(null)
+ .setTypeUnion(null)
+ .setTypeNested(
+ Address.newBuilder().setNum(TEST_NUM).setStreet(TEST_STREET)
+ .setCity(TEST_CITY).setState(TEST_STATE).setZip(TEST_ZIP)
+ .build())
+ .build();
+ DatumWriter<User> userDatumWriter = new SpecificDatumWriter<User>(User.class);
+ DataFileWriter<User> dataFileWriter = new DataFileWriter<User>(userDatumWriter);
+ dataFileWriter.create(user1.getSchema(), testFile);
+ dataFileWriter.append(user1);
+ dataFileWriter.append(user2);
+ dataFileWriter.close();
+ }
+ @Before
+ public void createFiles() throws IOException {
+ testFile = File.createTempFile("AvroInputFormatTest", null);
+ writeTestFile(testFile);
+ }
+
+
+ /**
+ * Test if the AvroInputFormat is able to properly read data from an avro file.
+ * @throws IOException
+ */
+ @Test
+ public void testDeserialisation() throws IOException {
+ Configuration parameters = new Configuration();
+
+ AvroInputFormat<User> format = new AvroInputFormat<User>(new Path(testFile.getAbsolutePath()), User.class);
+
+ format.configure(parameters);
+ FileInputSplit[] splits = format.createInputSplits(1);
+ assertEquals(splits.length, 1);
+ format.open(splits[0]);
+
+ User u = format.nextRecord(null);
+ assertNotNull(u);
+
+ String name = u.getName().toString();
+ assertNotNull("empty record", name);
+ assertEquals("name not equal", TEST_NAME, name);
+
+ // check arrays
+ List<CharSequence> sl = u.getTypeArrayString();
+ assertEquals("element 0 not equal", TEST_ARRAY_STRING_1, sl.get(0).toString());
+ assertEquals("element 1 not equal", TEST_ARRAY_STRING_2, sl.get(1).toString());
+
+ List<Boolean> bl = u.getTypeArrayBoolean();
+ assertEquals("element 0 not equal", TEST_ARRAY_BOOLEAN_1, bl.get(0));
+ assertEquals("element 1 not equal", TEST_ARRAY_BOOLEAN_2, bl.get(1));
+
+ // check enums
+ Colors enumValue = u.getTypeEnum();
+ assertEquals("enum not equal", TEST_ENUM_COLOR, enumValue);
+
+ // check maps
+ Map<CharSequence, Long> lm = u.getTypeMap();
+ assertEquals("map value of key 1 not equal", TEST_MAP_VALUE1, lm.get(new Utf8(TEST_MAP_KEY1)).longValue());
+ assertEquals("map value of key 2 not equal", TEST_MAP_VALUE2, lm.get(new Utf8(TEST_MAP_KEY2)).longValue());
+
+ assertFalse("expecting second element", format.reachedEnd());
+ assertNotNull("expecting second element", format.nextRecord(u));
+
+ assertNull(format.nextRecord(u));
+ assertTrue(format.reachedEnd());
+
+ format.close();
+ }
+
+ /**
+ * Test if the AvroInputFormat is able to properly read data from an avro file.
+ * @throws IOException
+ */
+ @Test
+ public void testDeserialisationReuseAvroRecordFalse() throws IOException {
+ Configuration parameters = new Configuration();
+
+ AvroInputFormat<User> format = new AvroInputFormat<User>(new Path(testFile.getAbsolutePath()), User.class);
+ format.setReuseAvroValue(false);
+
+ format.configure(parameters);
+ FileInputSplit[] splits = format.createInputSplits(1);
+ assertEquals(splits.length, 1);
+ format.open(splits[0]);
+
+ User u = format.nextRecord(null);
+ assertNotNull(u);
+
+ String name = u.getName().toString();
+ assertNotNull("empty record", name);
+ assertEquals("name not equal", TEST_NAME, name);
+
+ // check arrays
+ List<CharSequence> sl = u.getTypeArrayString();
+ assertEquals("element 0 not equal", TEST_ARRAY_STRING_1, sl.get(0).toString());
+ assertEquals("element 1 not equal", TEST_ARRAY_STRING_2, sl.get(1).toString());
+
+ List<Boolean> bl = u.getTypeArrayBoolean();
+ assertEquals("element 0 not equal", TEST_ARRAY_BOOLEAN_1, bl.get(0));
+ assertEquals("element 1 not equal", TEST_ARRAY_BOOLEAN_2, bl.get(1));
+
+ // check enums
+ Colors enumValue = u.getTypeEnum();
+ assertEquals("enum not equal", TEST_ENUM_COLOR, enumValue);
+
+ // check maps
+ Map<CharSequence, Long> lm = u.getTypeMap();
+ assertEquals("map value of key 1 not equal", TEST_MAP_VALUE1, lm.get(new Utf8(TEST_MAP_KEY1)).longValue());
+ assertEquals("map value of key 2 not equal", TEST_MAP_VALUE2, lm.get(new Utf8(TEST_MAP_KEY2)).longValue());
+
+ assertFalse("expecting second element", format.reachedEnd());
+ assertNotNull("expecting second element", format.nextRecord(u));
+
+ assertNull(format.nextRecord(u));
+ assertTrue(format.reachedEnd());
+
+ format.close();
+ }
+
+ /**
+ * Test if the Flink serialization is able to properly process GenericData.Record types.
+ * Usually users of Avro generate classes (POJOs) from Avro schemas.
+ * However, if generated classes are not available, one can also use GenericData.Record.
+ * It is an untyped key-value record which is using a schema to validate the correctness of the data.
+ *
+ * It is not recommended to use GenericData.Record with Flink. Use generated POJOs instead.
+ */
+ @Test
+ public void testDeserializeToGenericType() throws IOException {
+ DatumReader<GenericData.Record> datumReader = new GenericDatumReader<>(userSchema);
+
+ try (FileReader<GenericData.Record> dataFileReader = DataFileReader.openReader(testFile, datumReader)) {
+ // initialize Record by reading it from disk (thats easier than creating it by hand)
+ GenericData.Record rec = new GenericData.Record(userSchema);
+ dataFileReader.next(rec);
+
+ // check if record has been read correctly
+ assertNotNull(rec);
+ assertEquals("name not equal", TEST_NAME, rec.get("name").toString());
+ assertEquals("enum not equal", TEST_ENUM_COLOR.toString(), rec.get("type_enum").toString());
+ assertEquals(null, rec.get("type_long_test")); // it is null for the first record.
+
+ // now serialize it with our framework:
+ TypeInformation<GenericData.Record> te = TypeExtractor.createTypeInfo(GenericData.Record.class);
+
+ ExecutionConfig ec = new ExecutionConfig();
+ Assert.assertEquals(GenericTypeInfo.class, te.getClass());
+
+ Serializers.recursivelyRegisterType(te.getTypeClass(), ec, new HashSet<Class<?>>());
+
+ TypeSerializer<GenericData.Record> tser = te.createSerializer(ec);
+ Assert.assertEquals(1, ec.getDefaultKryoSerializerClasses().size());
+ Assert.assertTrue(
+ ec.getDefaultKryoSerializerClasses().containsKey(Schema.class) &&
+ ec.getDefaultKryoSerializerClasses().get(Schema.class).equals(Serializers.AvroSchemaSerializer.class));
+
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ try (DataOutputViewStreamWrapper outView = new DataOutputViewStreamWrapper(out)) {
+ tser.serialize(rec, outView);
+ }
+
+ GenericData.Record newRec;
+ try (DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(
+ new ByteArrayInputStream(out.toByteArray())))
+ {
+ newRec = tser.deserialize(inView);
+ }
+
+ // check if it is still the same
+ assertNotNull(newRec);
+ assertEquals("enum not equal", TEST_ENUM_COLOR.toString(), newRec.get("type_enum").toString());
+ assertEquals("name not equal", TEST_NAME, newRec.get("name").toString());
+ assertEquals(null, newRec.get("type_long_test"));
+ }
+ }
+
+ /**
+ * This test validates proper serialization with specific (generated POJO) types.
+ */
+ @Test
+ public void testDeserializeToSpecificType() throws IOException {
+
+ DatumReader<User> datumReader = new SpecificDatumReader<User>(userSchema);
+
+ try (FileReader<User> dataFileReader = DataFileReader.openReader(testFile, datumReader)) {
+ User rec = dataFileReader.next();
+
+ // check if record has been read correctly
+ assertNotNull(rec);
+ assertEquals("name not equal", TEST_NAME, rec.get("name").toString());
+ assertEquals("enum not equal", TEST_ENUM_COLOR.toString(), rec.get("type_enum").toString());
+
+ // now serialize it with our framework:
+ ExecutionConfig ec = new ExecutionConfig();
+ TypeInformation<User> te = TypeExtractor.createTypeInfo(User.class);
+
+ Assert.assertEquals(AvroTypeInfo.class, te.getClass());
+ TypeSerializer<User> tser = te.createSerializer(ec);
+
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ try (DataOutputViewStreamWrapper outView = new DataOutputViewStreamWrapper(out)) {
+ tser.serialize(rec, outView);
+ }
+
+ User newRec;
+ try (DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(
+ new ByteArrayInputStream(out.toByteArray())))
+ {
+ newRec = tser.deserialize(inView);
+ }
+
+ // check if it is still the same
+ assertNotNull(newRec);
+ assertEquals("name not equal", TEST_NAME, newRec.getName().toString());
+ assertEquals("enum not equal", TEST_ENUM_COLOR.toString(), newRec.getTypeEnum().toString());
+ }
+ }
+
+ /**
+ * Test if the AvroInputFormat is able to properly read data from an Avro
+ * file as a GenericRecord.
+ *
+ * @throws IOException,
+ * if there is an exception
+ */
+ @Test
+ public void testDeserialisationGenericRecord() throws IOException {
+ Configuration parameters = new Configuration();
+
+ AvroInputFormat<GenericRecord> format = new AvroInputFormat<GenericRecord>(new Path(testFile.getAbsolutePath()),
+ GenericRecord.class);
+
+ doTestDeserializationGenericRecord(format, parameters);
+ }
+
+ /**
+ * Helper method to test GenericRecord serialisation
+ *
+ * @param format
+ * the format to test
+ * @param parameters
+ * the configuration to use
+ * @throws IOException
+ * thrown id there is a issue
+ */
+ @SuppressWarnings("unchecked")
+ private void doTestDeserializationGenericRecord(final AvroInputFormat<GenericRecord> format,
+ final Configuration parameters) throws IOException {
+ try {
+ format.configure(parameters);
+ FileInputSplit[] splits = format.createInputSplits(1);
+ assertEquals(splits.length, 1);
+ format.open(splits[0]);
+
+ GenericRecord u = format.nextRecord(null);
+ assertNotNull(u);
+ assertEquals("The schemas should be equal", userSchema, u.getSchema());
+
+ String name = u.get("name").toString();
+ assertNotNull("empty record", name);
+ assertEquals("name not equal", TEST_NAME, name);
+
+ // check arrays
+ List<CharSequence> sl = (List<CharSequence>) u.get("type_array_string");
+ assertEquals("element 0 not equal", TEST_ARRAY_STRING_1, sl.get(0).toString());
+ assertEquals("element 1 not equal", TEST_ARRAY_STRING_2, sl.get(1).toString());
+
+ List<Boolean> bl = (List<Boolean>) u.get("type_array_boolean");
+ assertEquals("element 0 not equal", TEST_ARRAY_BOOLEAN_1, bl.get(0));
+ assertEquals("element 1 not equal", TEST_ARRAY_BOOLEAN_2, bl.get(1));
+
+ // check enums
+ GenericData.EnumSymbol enumValue = (GenericData.EnumSymbol) u.get("type_enum");
+ assertEquals("enum not equal", TEST_ENUM_COLOR.toString(), enumValue.toString());
+
+ // check maps
+ Map<CharSequence, Long> lm = (Map<CharSequence, Long>) u.get("type_map");
+ assertEquals("map value of key 1 not equal", TEST_MAP_VALUE1, lm.get(new Utf8(TEST_MAP_KEY1)).longValue());
+ assertEquals("map value of key 2 not equal", TEST_MAP_VALUE2, lm.get(new Utf8(TEST_MAP_KEY2)).longValue());
+
+ assertFalse("expecting second element", format.reachedEnd());
+ assertNotNull("expecting second element", format.nextRecord(u));
+
+ assertNull(format.nextRecord(u));
+ assertTrue(format.reachedEnd());
+ } finally {
+ format.close();
+ }
+ }
+
+ /**
+ * Test if the AvroInputFormat is able to properly read data from an avro
+ * file as a GenericRecord
+ *
+ * @throws IOException,
+ * if there is an error
+ */
+ @Test
+ public void testDeserialisationGenericRecordReuseAvroValueFalse() throws IOException {
+ Configuration parameters = new Configuration();
+
+ AvroInputFormat<GenericRecord> format = new AvroInputFormat<GenericRecord>(new Path(testFile.getAbsolutePath()),
+ GenericRecord.class);
+ format.configure(parameters);
+ format.setReuseAvroValue(false);
+
+ doTestDeserializationGenericRecord(format, parameters);
+ }
+
+ @After
+ public void deleteFiles() {
+ testFile.delete();
+ }
+
+}