You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/09/30 19:59:01 UTC
[2/9] git commit: [FLINK-1117] Clean up flink-avro project: remove
deprecated AvroRecord format, migrate tests to new java api.
[FLINK-1117] Clean up flink-avro project: remove deprecated AvroRecord format, migrate tests to new java api.
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/626d6b78
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/626d6b78
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/626d6b78
Branch: refs/heads/master
Commit: 626d6b785db649e06951e2f336f5ca411b30dce5
Parents: 38e4755
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Sep 30 14:15:22 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Sep 30 18:39:58 2014 +0200
----------------------------------------------------------------------
flink-addons/flink-avro/pom.xml | 7 +-
.../apache/flink/api/avro/AvroBaseValue.java | 153 ----
.../api/io/avro/example/AvroTypeExample.java | 111 +++
.../apache/flink/api/io/avro/example/User.java | 269 +++++++
.../java/record/io/avro/AvroInputFormat.java | 111 ---
.../record/io/avro/AvroRecordInputFormat.java | 374 ---------
.../avro/example/ReflectiveAvroTypeExample.java | 161 ----
.../api/java/record/io/avro/example/SUser.java | 25 -
.../api/java/record/io/avro/example/User.java | 269 -------
.../flink/api/avro/AvroOutputFormatTest.java | 2 +-
.../api/avro/AvroWithEmptyArrayITCase.java | 217 ------
.../flink/api/avro/EncoderDecoderTest.java | 4 +-
.../avro/testjar/AvroExternalJarProgram.java | 13 -
.../api/io/avro/AvroRecordInputFormatTest.java | 167 ++++
.../flink/api/io/avro/generated/Colors.java | 32 +
.../flink/api/io/avro/generated/User.java | 755 +++++++++++++++++++
.../io/avro/AvroRecordInputFormatTest.java | 167 ----
.../java/record/io/avro/generated/Colors.java | 32 -
.../api/java/record/io/avro/generated/User.java | 755 -------------------
.../runtime/AvroSerializerEmptyArrayTest.java | 189 +++++
tools/maven/suppressions.xml | 2 +-
21 files changed, 1531 insertions(+), 2284 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/626d6b78/flink-addons/flink-avro/pom.xml
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/pom.xml b/flink-addons/flink-avro/pom.xml
index 68f722c..ff4decc 100644
--- a/flink-addons/flink-avro/pom.xml
+++ b/flink-addons/flink-avro/pom.xml
@@ -54,10 +54,11 @@ under the License.
<!-- version is derived from base module -->
</dependency>
+ <!-- guava needs to be in "provided" scope, so to not re-export the dependency -->
<dependency>
- <groupId>org.apache.avro</groupId>
- <artifactId>avro-mapred</artifactId>
- <!-- version is derived from base module -->
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <scope>provided</scope>
</dependency>
<dependency>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/626d6b78/flink-addons/flink-avro/src/main/java/org/apache/flink/api/avro/AvroBaseValue.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/main/java/org/apache/flink/api/avro/AvroBaseValue.java b/flink-addons/flink-avro/src/main/java/org/apache/flink/api/avro/AvroBaseValue.java
deleted file mode 100644
index 0d64910..0000000
--- a/flink-addons/flink-avro/src/main/java/org/apache/flink/api/avro/AvroBaseValue.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.avro;
-
-import java.io.IOException;
-
-import org.apache.avro.mapred.AvroValue;
-import org.apache.avro.reflect.ReflectDatumReader;
-import org.apache.avro.reflect.ReflectDatumWriter;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.types.Key;
-import org.apache.flink.util.ReflectionUtil;
-
-
-public abstract class AvroBaseValue<T> extends AvroValue<T> implements Key<AvroBaseValue<T>> {
-
- private static final long serialVersionUID = 1L;
-
-
- public AvroBaseValue() {}
-
- public AvroBaseValue(T datum) {
- super(datum);
- }
-
-
- // --------------------------------------------------------------------------------------------
- // Serialization / Deserialization
- // --------------------------------------------------------------------------------------------
-
- private ReflectDatumWriter<T> writer;
- private ReflectDatumReader<T> reader;
-
- private DataOutputEncoder encoder;
- private DataInputDecoder decoder;
-
-
- @Override
- public void write(DataOutputView out) throws IOException {
- // the null flag
- if (datum() == null) {
- out.writeBoolean(false);
- } else {
- out.writeBoolean(true);
-
- DataOutputEncoder encoder = getEncoder();
- encoder.setOut(out);
- getWriter().write(datum(), encoder);
- }
- }
-
- @Override
- public void read(DataInputView in) throws IOException {
- // the null flag
- if (in.readBoolean()) {
-
- DataInputDecoder decoder = getDecoder();
- decoder.setIn(in);
- datum(getReader().read(datum(), decoder));
- }
- }
-
- private ReflectDatumWriter<T> getWriter() {
- if (this.writer == null) {
- @SuppressWarnings("unchecked")
- Class<T> clazz = (Class<T>) datum().getClass();
- this.writer = new ReflectDatumWriter<T>(clazz);
- }
- return this.writer;
- }
-
- private ReflectDatumReader<T> getReader() {
- if (this.reader == null) {
- Class<T> datumClass = ReflectionUtil.getTemplateType1(getClass());
- this.reader = new ReflectDatumReader<T>(datumClass);
- }
- return this.reader;
- }
-
- private DataOutputEncoder getEncoder() {
- if (this.encoder == null) {
- this.encoder = new DataOutputEncoder();
- }
- return this.encoder;
- }
-
- private DataInputDecoder getDecoder() {
- if (this.decoder == null) {
- this.decoder = new DataInputDecoder();
- }
- return this.decoder;
- }
-
- // --------------------------------------------------------------------------------------------
- // Hashing / Equality
- // --------------------------------------------------------------------------------------------
-
- @Override
- public int hashCode() {
- return datum() == null ? 0 : datum().hashCode();
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj.getClass() == this.getClass()) {
- Object otherDatum = ((AvroBaseValue<?>) obj).datum();
- Object thisDatum = datum();
-
- if (thisDatum == null) {
- return otherDatum == null;
- } else {
- return thisDatum.equals(otherDatum);
- }
- } else {
- return false;
- }
- }
-
- @Override
- public String toString() {
- return "AvroBaseValue (" + datum() + ")";
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public int compareTo(AvroBaseValue<T> o) {
- Object otherDatum = o.datum();
- Object thisDatum = datum();
-
- if (thisDatum == null) {
- return otherDatum == null ? 0 : -1;
- } else {
- return otherDatum == null ? 1: ((Comparable<Object>) thisDatum).compareTo(otherDatum);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/626d6b78/flink-addons/flink-avro/src/main/java/org/apache/flink/api/io/avro/example/AvroTypeExample.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/main/java/org/apache/flink/api/io/avro/example/AvroTypeExample.java b/flink-addons/flink-avro/src/main/java/org/apache/flink/api/io/avro/example/AvroTypeExample.java
new file mode 100644
index 0000000..6affeec
--- /dev/null
+++ b/flink-addons/flink-avro/src/main/java/org/apache/flink/api/io/avro/example/AvroTypeExample.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.io.avro.example;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.io.GenericInputFormat;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.Collector;
+
+@SuppressWarnings("serial")
+public class AvroTypeExample {
+
+
+ public static void main(String[] args) throws Exception {
+
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<User> users = env.createInput(new UserGeneratingInputFormat());
+
+ users
+ .map(new NumberExtractingMapper())
+ .groupBy(1)
+ .reduceGroup(new ConcatenatingReducer())
+ .print();
+
+ env.execute();
+ }
+
+
+
+ public static final class NumberExtractingMapper implements MapFunction<User, Tuple2<User, Integer>> {
+
+ @Override
+ public Tuple2<User, Integer> map(User user) {
+ return new Tuple2<User, Integer>(user, user.getFavoriteNumber());
+ }
+ }
+
+
+ public static final class ConcatenatingReducer implements GroupReduceFunction<Tuple2<User, Integer>, Tuple2<Integer, String>> {
+
+ @Override
+ public void reduce(Iterable<Tuple2<User, Integer>> values, Collector<Tuple2<Integer, String>> out) throws Exception {
+ int number = 0;
+ StringBuilder colors = new StringBuilder();
+
+ for (Tuple2<User, Integer> u : values) {
+ number = u.f1;
+ colors.append(u.f0.getFavoriteColor()).append(" - ");
+ }
+
+ colors.setLength(colors.length() - 3);
+ out.collect(new Tuple2<Integer, String>(number, colors.toString()));
+ }
+ }
+
+
+ public static final class UserGeneratingInputFormat extends GenericInputFormat<User> {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final int NUM = 100;
+
+ private final Random rnd = new Random(32498562304986L);
+
+ private static final String[] NAMES = { "Peter", "Bob", "Liddy", "Alexander", "Stan" };
+
+ private static final String[] COLORS = { "mauve", "crimson", "copper", "sky", "grass" };
+
+ private int count;
+
+
+ @Override
+ public boolean reachedEnd() throws IOException {
+ return count >= NUM;
+ }
+
+ @Override
+ public User nextRecord(User reuse) throws IOException {
+ count++;
+
+ User u = new User();
+ u.setName(NAMES[rnd.nextInt(NAMES.length)]);
+ u.setFavoriteColor(COLORS[rnd.nextInt(COLORS.length)]);
+ u.setFavoriteNumber(rnd.nextInt(87));
+ return u;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/626d6b78/flink-addons/flink-avro/src/main/java/org/apache/flink/api/io/avro/example/User.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/main/java/org/apache/flink/api/io/avro/example/User.java b/flink-addons/flink-avro/src/main/java/org/apache/flink/api/io/avro/example/User.java
new file mode 100644
index 0000000..3394d60
--- /dev/null
+++ b/flink-addons/flink-avro/src/main/java/org/apache/flink/api/io/avro/example/User.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+/**
+ * Autogenerated by Avro
+ *
+ * DO NOT EDIT DIRECTLY
+ */
+package org.apache.flink.api.io.avro.example;
+@SuppressWarnings("all")
+@org.apache.avro.specific.AvroGenerated
+public class User extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
+ public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"org.apache.flink.api.avro.example\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"favorite_number\",\"type\":[\"int\",\"null\"]},{\"name\":\"favorite_color\",\"type\":[\"string\",\"null\"]}]}");
+ public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
+ @Deprecated public java.lang.CharSequence name;
+ @Deprecated public java.lang.Integer favorite_number;
+ @Deprecated public java.lang.CharSequence favorite_color;
+
+ /**
+ * Default constructor. Note that this does not initialize fields
+ * to their default values from the schema. If that is desired then
+ * one should use {@link \#newBuilder()}.
+ */
+ public User() {}
+
+ /**
+ * All-args constructor.
+ */
+ public User(java.lang.CharSequence name, java.lang.Integer favorite_number, java.lang.CharSequence favorite_color) {
+ this.name = name;
+ this.favorite_number = favorite_number;
+ this.favorite_color = favorite_color;
+ }
+
+ public org.apache.avro.Schema getSchema() { return SCHEMA$; }
+ // Used by DatumWriter. Applications should not call.
+ public java.lang.Object get(int field$) {
+ switch (field$) {
+ case 0: return name;
+ case 1: return favorite_number;
+ case 2: return favorite_color;
+ default: throw new org.apache.avro.AvroRuntimeException("Bad index");
+ }
+ }
+ // Used by DatumReader. Applications should not call.
+ @SuppressWarnings(value="unchecked")
+ public void put(int field$, java.lang.Object value$) {
+ switch (field$) {
+ case 0: name = (java.lang.CharSequence)value$; break;
+ case 1: favorite_number = (java.lang.Integer)value$; break;
+ case 2: favorite_color = (java.lang.CharSequence)value$; break;
+ default: throw new org.apache.avro.AvroRuntimeException("Bad index");
+ }
+ }
+
+ /**
+ * Gets the value of the 'name' field.
+ */
+ public java.lang.CharSequence getName() {
+ return name;
+ }
+
+ /**
+ * Sets the value of the 'name' field.
+ * @param value the value to set.
+ */
+ public void setName(java.lang.CharSequence value) {
+ this.name = value;
+ }
+
+ /**
+ * Gets the value of the 'favorite_number' field.
+ */
+ public java.lang.Integer getFavoriteNumber() {
+ return favorite_number;
+ }
+
+ /**
+ * Sets the value of the 'favorite_number' field.
+ * @param value the value to set.
+ */
+ public void setFavoriteNumber(java.lang.Integer value) {
+ this.favorite_number = value;
+ }
+
+ /**
+ * Gets the value of the 'favorite_color' field.
+ */
+ public java.lang.CharSequence getFavoriteColor() {
+ return favorite_color;
+ }
+
+ /**
+ * Sets the value of the 'favorite_color' field.
+ * @param value the value to set.
+ */
+ public void setFavoriteColor(java.lang.CharSequence value) {
+ this.favorite_color = value;
+ }
+
+ /** Creates a new User RecordBuilder */
+ public static org.apache.flink.api.io.avro.example.User.Builder newBuilder() {
+ return new org.apache.flink.api.io.avro.example.User.Builder();
+ }
+
+ /** Creates a new User RecordBuilder by copying an existing Builder */
+ public static org.apache.flink.api.io.avro.example.User.Builder newBuilder(org.apache.flink.api.io.avro.example.User.Builder other) {
+ return new org.apache.flink.api.io.avro.example.User.Builder(other);
+ }
+
+ /** Creates a new User RecordBuilder by copying an existing User instance */
+ public static org.apache.flink.api.io.avro.example.User.Builder newBuilder(org.apache.flink.api.io.avro.example.User other) {
+ return new org.apache.flink.api.io.avro.example.User.Builder(other);
+ }
+
+ /**
+ * RecordBuilder for User instances.
+ */
+ public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase<User>
+ implements org.apache.avro.data.RecordBuilder<User> {
+
+ private java.lang.CharSequence name;
+ private java.lang.Integer favorite_number;
+ private java.lang.CharSequence favorite_color;
+
+ /** Creates a new Builder */
+ private Builder() {
+ super(org.apache.flink.api.io.avro.example.User.SCHEMA$);
+ }
+
+ /** Creates a Builder by copying an existing Builder */
+ private Builder(org.apache.flink.api.io.avro.example.User.Builder other) {
+ super(other);
+ if (isValidValue(fields()[0], other.name)) {
+ this.name = data().deepCopy(fields()[0].schema(), other.name);
+ fieldSetFlags()[0] = true;
+ }
+ if (isValidValue(fields()[1], other.favorite_number)) {
+ this.favorite_number = data().deepCopy(fields()[1].schema(), other.favorite_number);
+ fieldSetFlags()[1] = true;
+ }
+ if (isValidValue(fields()[2], other.favorite_color)) {
+ this.favorite_color = data().deepCopy(fields()[2].schema(), other.favorite_color);
+ fieldSetFlags()[2] = true;
+ }
+ }
+
+ /** Creates a Builder by copying an existing User instance */
+ private Builder(org.apache.flink.api.io.avro.example.User other) {
+ super(org.apache.flink.api.io.avro.example.User.SCHEMA$);
+ if (isValidValue(fields()[0], other.name)) {
+ this.name = data().deepCopy(fields()[0].schema(), other.name);
+ fieldSetFlags()[0] = true;
+ }
+ if (isValidValue(fields()[1], other.favorite_number)) {
+ this.favorite_number = data().deepCopy(fields()[1].schema(), other.favorite_number);
+ fieldSetFlags()[1] = true;
+ }
+ if (isValidValue(fields()[2], other.favorite_color)) {
+ this.favorite_color = data().deepCopy(fields()[2].schema(), other.favorite_color);
+ fieldSetFlags()[2] = true;
+ }
+ }
+
+ /** Gets the value of the 'name' field */
+ public java.lang.CharSequence getName() {
+ return name;
+ }
+
+ /** Sets the value of the 'name' field */
+ public org.apache.flink.api.io.avro.example.User.Builder setName(java.lang.CharSequence value) {
+ validate(fields()[0], value);
+ this.name = value;
+ fieldSetFlags()[0] = true;
+ return this;
+ }
+
+ /** Checks whether the 'name' field has been set */
+ public boolean hasName() {
+ return fieldSetFlags()[0];
+ }
+
+ /** Clears the value of the 'name' field */
+ public org.apache.flink.api.io.avro.example.User.Builder clearName() {
+ name = null;
+ fieldSetFlags()[0] = false;
+ return this;
+ }
+
+ /** Gets the value of the 'favorite_number' field */
+ public java.lang.Integer getFavoriteNumber() {
+ return favorite_number;
+ }
+
+ /** Sets the value of the 'favorite_number' field */
+ public org.apache.flink.api.io.avro.example.User.Builder setFavoriteNumber(java.lang.Integer value) {
+ validate(fields()[1], value);
+ this.favorite_number = value;
+ fieldSetFlags()[1] = true;
+ return this;
+ }
+
+ /** Checks whether the 'favorite_number' field has been set */
+ public boolean hasFavoriteNumber() {
+ return fieldSetFlags()[1];
+ }
+
+ /** Clears the value of the 'favorite_number' field */
+ public org.apache.flink.api.io.avro.example.User.Builder clearFavoriteNumber() {
+ favorite_number = null;
+ fieldSetFlags()[1] = false;
+ return this;
+ }
+
+ /** Gets the value of the 'favorite_color' field */
+ public java.lang.CharSequence getFavoriteColor() {
+ return favorite_color;
+ }
+
+ /** Sets the value of the 'favorite_color' field */
+ public org.apache.flink.api.io.avro.example.User.Builder setFavoriteColor(java.lang.CharSequence value) {
+ validate(fields()[2], value);
+ this.favorite_color = value;
+ fieldSetFlags()[2] = true;
+ return this;
+ }
+
+ /** Checks whether the 'favorite_color' field has been set */
+ public boolean hasFavoriteColor() {
+ return fieldSetFlags()[2];
+ }
+
+ /** Clears the value of the 'favorite_color' field */
+ public org.apache.flink.api.io.avro.example.User.Builder clearFavoriteColor() {
+ favorite_color = null;
+ fieldSetFlags()[2] = false;
+ return this;
+ }
+
+ @Override
+ public User build() {
+ try {
+ User record = new User();
+ record.name = fieldSetFlags()[0] ? this.name : (java.lang.CharSequence) defaultValue(fields()[0]);
+ record.favorite_number = fieldSetFlags()[1] ? this.favorite_number : (java.lang.Integer) defaultValue(fields()[1]);
+ record.favorite_color = fieldSetFlags()[2] ? this.favorite_color : (java.lang.CharSequence) defaultValue(fields()[2]);
+ return record;
+ } catch (Exception e) {
+ throw new org.apache.avro.AvroRuntimeException(e);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/626d6b78/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/AvroInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/AvroInputFormat.java b/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/AvroInputFormat.java
deleted file mode 100644
index a898f8d..0000000
--- a/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/AvroInputFormat.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.record.io.avro;
-
-import java.io.IOException;
-
-import org.apache.avro.file.DataFileReader;
-import org.apache.avro.file.FileReader;
-import org.apache.avro.file.SeekableInput;
-import org.apache.avro.io.DatumReader;
-import org.apache.avro.reflect.ReflectDatumReader;
-import org.apache.avro.specific.SpecificDatumReader;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.flink.api.avro.AvroBaseValue;
-import org.apache.flink.api.avro.FSDataInputStreamWrapper;
-import org.apache.flink.api.java.record.io.FileInputFormat;
-import org.apache.flink.core.fs.FileInputSplit;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.InstantiationUtil;
-import org.apache.flink.util.ReflectionUtil;
-
-
-public class AvroInputFormat<E> extends FileInputFormat {
-
- private static final long serialVersionUID = 1L;
-
- private static final Logger LOG = LoggerFactory.getLogger(AvroInputFormat.class);
-
-
- private final Class<? extends AvroBaseValue<E>> avroWrapperTypeClass;
-
- private final Class<E> avroValueType;
-
-
- private transient FileReader<E> dataFileReader;
-
- private transient E reuseAvroValue;
-
- private transient AvroBaseValue<E> wrapper;
-
-
- public AvroInputFormat(Class<? extends AvroBaseValue<E>> wrapperClass) {
- this.avroWrapperTypeClass = wrapperClass;
- this.avroValueType = ReflectionUtil.getTemplateType1(wrapperClass);
- this.unsplittable = true;
- }
-
- public AvroInputFormat(Class<? extends AvroBaseValue<E>> wrapperClass, Class<E> avroType) {
- this.avroValueType = avroType;
- this.avroWrapperTypeClass = wrapperClass;
- this.unsplittable = true;
- }
-
- @Override
- public void open(FileInputSplit split) throws IOException {
- super.open(split);
-
- this.wrapper = InstantiationUtil.instantiate(avroWrapperTypeClass, AvroBaseValue.class);
-
- DatumReader<E> datumReader;
- if (org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroValueType)) {
- datumReader = new SpecificDatumReader<E>(avroValueType);
- } else {
- datumReader = new ReflectDatumReader<E>(avroValueType);
- }
-
- LOG.info("Opening split " + split);
-
- SeekableInput in = new FSDataInputStreamWrapper(stream, (int) split.getLength());
-
- dataFileReader = DataFileReader.openReader(in, datumReader);
- dataFileReader.sync(split.getStart());
-
- reuseAvroValue = null;
- }
-
- @Override
- public boolean reachedEnd() throws IOException {
- return !dataFileReader.hasNext();
- }
-
- @Override
- public Record nextRecord(Record record) throws IOException {
- if (!dataFileReader.hasNext()) {
- return null;
- }
-
- reuseAvroValue = dataFileReader.next(reuseAvroValue);
- wrapper.datum(reuseAvroValue);
- record.setField(0, wrapper);
- return record;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/626d6b78/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/AvroRecordInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/AvroRecordInputFormat.java b/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/AvroRecordInputFormat.java
deleted file mode 100644
index 1639ec4..0000000
--- a/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/AvroRecordInputFormat.java
+++ /dev/null
@@ -1,374 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.record.io.avro;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.avro.Schema;
-import org.apache.avro.Schema.Field;
-import org.apache.avro.Schema.Type;
-import org.apache.avro.file.DataFileReader;
-import org.apache.avro.file.FileReader;
-import org.apache.avro.file.SeekableInput;
-import org.apache.avro.generic.GenericDatumReader;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.io.DatumReader;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.flink.api.avro.FSDataInputStreamWrapper;
-import org.apache.flink.api.java.record.io.FileInputFormat;
-import org.apache.flink.core.fs.FileInputSplit;
-import org.apache.flink.core.fs.FileStatus;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.types.BooleanValue;
-import org.apache.flink.types.DoubleValue;
-import org.apache.flink.types.FloatValue;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.ListValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.MapValue;
-import org.apache.flink.types.NullValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
-import org.apache.flink.types.Value;
-
-/**
- * Input format to read Avro files.
- *
- * The input format currently supports only flat avro schemas. So there is no
- * support for complex types except for nullable primitve fields, e.g.
- * ["string", null] (See
- * http://avro.apache.org/docs/current/spec.html#schema_complex)
- *
- */
-public class AvroRecordInputFormat extends FileInputFormat {
- private static final long serialVersionUID = 1L;
-
- private static final Logger LOG = LoggerFactory.getLogger(AvroRecordInputFormat.class);
-
- private FileReader<GenericRecord> dataFileReader;
- private GenericRecord reuseAvroRecord = null;
-
- @Override
- public void open(FileInputSplit split) throws IOException {
- super.open(split);
- DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>();
- SeekableInput in = new FSDataInputStreamWrapper(stream, (int) split.getLength());
- LOG.info("Opening split " + split);
- dataFileReader = DataFileReader.openReader(in, datumReader);
- dataFileReader.sync(split.getStart());
- }
-
- @Override
- public boolean reachedEnd() throws IOException {
- return !dataFileReader.hasNext();
- }
-
- @Override
- public Record nextRecord(Record record) throws IOException {
- if (!dataFileReader.hasNext()) {
- return null;
- }
- if (record == null) {
- throw new IllegalArgumentException("Empty PactRecord given");
- }
- reuseAvroRecord = dataFileReader.next(reuseAvroRecord);
- final List<Field> fields = reuseAvroRecord.getSchema().getFields();
- for (Field field : fields) {
- final Value value = convertAvroToPactValue(field, reuseAvroRecord.get(field.pos()));
- record.setField(field.pos(), value);
- record.updateBinaryRepresenation();
- }
-
- return record;
- }
-
-
- @SuppressWarnings("unchecked")
- private final Value convertAvroToPactValue(final Field field, final Object avroRecord) {
- if (avroRecord == null) {
- return null;
- }
- final Type type = checkTypeConstraintsAndGetType(field.schema());
-
- // check for complex types
- // (complex type FIXED is not yet supported)
- switch (type) {
- case ARRAY:
- final Type elementType = field.schema().getElementType().getType();
- final List<?> avroList = (List<?>) avroRecord;
- return convertAvroArrayToListValue(elementType, avroList);
- case ENUM:
- final List<String> symbols = field.schema().getEnumSymbols();
- final String avroRecordString = avroRecord.toString();
- if (!symbols.contains(avroRecordString)) {
- throw new RuntimeException("The given Avro file contains field with a invalid enum symbol");
- }
- sString.setValue(avroRecordString);
- return sString;
- case MAP:
- final Type valueType = field.schema().getValueType().getType();
- final Map<CharSequence, ?> avroMap = (Map<CharSequence, ?>) avroRecord;
- return convertAvroMapToMapValue(valueType, avroMap);
-
- // primitive type
- default:
- return convertAvroPrimitiveToValue(type, avroRecord);
-
- }
- }
-
- private final ListValue<?> convertAvroArrayToListValue(Type elementType, List<?> avroList) {
- switch (elementType) {
- case STRING:
- StringListValue sl = new StringListValue();
- for (Object item : avroList) {
- sl.add(new StringValue((CharSequence) item));
- }
- return sl;
- case INT:
- IntListValue il = new IntListValue();
- for (Object item : avroList) {
- il.add(new IntValue((Integer) item));
- }
- return il;
- case BOOLEAN:
- BooleanListValue bl = new BooleanListValue();
- for (Object item : avroList) {
- bl.add(new BooleanValue((Boolean) item));
- }
- return bl;
- case DOUBLE:
- DoubleListValue dl = new DoubleListValue();
- for (Object item : avroList) {
- dl.add(new DoubleValue((Double) item));
- }
- return dl;
- case FLOAT:
- FloatListValue fl = new FloatListValue();
- for (Object item : avroList) {
- fl.add(new FloatValue((Float) item));
- }
- return fl;
- case LONG:
- LongListValue ll = new LongListValue();
- for (Object item : avroList) {
- ll.add(new LongValue((Long) item));
- }
- return ll;
- default:
- throw new RuntimeException("Elements of type " + elementType + " are not supported for Avro arrays.");
- }
- }
-
- private final MapValue<StringValue, ?> convertAvroMapToMapValue(Type mapValueType, Map<CharSequence, ?> avroMap) {
- switch (mapValueType) {
- case STRING:
- StringMapValue sm = new StringMapValue();
- for (Map.Entry<CharSequence, ?> entry : avroMap.entrySet()) {
- sm.put(new StringValue((CharSequence) entry.getKey()), new StringValue((String) entry.getValue()));
- }
- return sm;
- case INT:
- IntMapValue im = new IntMapValue();
- for (Map.Entry<CharSequence, ?> entry : avroMap.entrySet()) {
- im.put(new StringValue((CharSequence) entry.getKey()), new IntValue((Integer) entry.getValue()));
- }
- return im;
- case BOOLEAN:
- BooleanMapValue bm = new BooleanMapValue();
- for (Map.Entry<CharSequence, ?> entry : avroMap.entrySet()) {
- bm.put(new StringValue((CharSequence) entry.getKey()), new BooleanValue((Boolean) entry.getValue()));
- }
- return bm;
- case DOUBLE:
- DoubleMapValue dm = new DoubleMapValue();
- for (Map.Entry<CharSequence, ?> entry : avroMap.entrySet()) {
- dm.put(new StringValue((CharSequence) entry.getKey()), new DoubleValue((Double) entry.getValue()));
- }
- return dm;
- case FLOAT:
- FloatMapValue fm = new FloatMapValue();
- for (Map.Entry<CharSequence, ?> entry : avroMap.entrySet()) {
- fm.put(new StringValue((CharSequence) entry.getKey()), new FloatValue((Float) entry.getValue()));
- }
- return fm;
- case LONG:
- LongMapValue lm = new LongMapValue();
- for (Map.Entry<CharSequence, ?> entry : avroMap.entrySet()) {
- lm.put(new StringValue((CharSequence) entry.getKey()), new LongValue((Long) entry.getValue()));
- }
- return lm;
-
- default:
- throw new RuntimeException("Map values of type " + mapValueType + " are not supported for Avro map.");
- }
- }
-
- private StringValue sString = new StringValue();
- private IntValue sInt = new IntValue();
- private BooleanValue sBool = new BooleanValue();
- private DoubleValue sDouble = new DoubleValue();
- private FloatValue sFloat = new FloatValue();
- private LongValue sLong = new LongValue();
-
- private final Value convertAvroPrimitiveToValue(Type type, Object avroRecord) {
- switch (type) {
- case STRING:
- sString.setValue((CharSequence) avroRecord);
- return sString;
- case INT:
- sInt.setValue((Integer) avroRecord);
- return sInt;
- case BOOLEAN:
- sBool.setValue((Boolean) avroRecord);
- return sBool;
- case DOUBLE:
- sDouble.setValue((Double) avroRecord);
- return sDouble;
- case FLOAT:
- sFloat.setValue((Float) avroRecord);
- return sFloat;
- case LONG:
- sLong.setValue((Long) avroRecord);
- return sLong;
- case NULL:
- return NullValue.getInstance();
- default:
- throw new RuntimeException(
- "Type "
- + type
- + " for AvroInputFormat is not implemented. Open an issue on GitHub.");
- }
- }
-
- private final Type checkTypeConstraintsAndGetType(final Schema schema) {
- final Type type = schema.getType();
- if (type == Type.RECORD) {
- throw new RuntimeException("The given Avro file contains complex data types which are not supported right now");
- }
-
- if (type == Type.UNION) {
- List<Schema> types = schema.getTypes();
- if (types.size() > 2) {
- throw new RuntimeException("The given Avro file contains a union that has more than two elements");
- }
- if (types.size() == 1 && types.get(0).getType() != Type.UNION) {
- return types.get(0).getType();
- }
- if (types.get(0).getType() == Type.UNION || types.get(1).getType() == Type.UNION) {
- throw new RuntimeException("The given Avro file contains a nested union");
- }
- if (types.get(0).getType() == Type.NULL) {
- return types.get(1).getType();
- } else {
- if (types.get(1).getType() != Type.NULL) {
- throw new RuntimeException("The given Avro file is contains a union with two non-null types.");
- }
- return types.get(0).getType();
- }
- }
- return type;
- }
-
- /**
- * Set minNumSplits to number of files.
- */
- @Override
- public FileInputSplit[] createInputSplits(int minNumSplits) throws IOException {
- int numAvroFiles = 0;
- final Path path = this.filePath;
- // get all the files that are involved in the splits
- final FileSystem fs = path.getFileSystem();
- final FileStatus pathFile = fs.getFileStatus(path);
-
- if (!acceptFile(pathFile)) {
- throw new IOException("The given file does not pass the file-filter");
- }
- if (pathFile.isDir()) {
- // input is directory. list all contained files
- final FileStatus[] dir = fs.listStatus(path);
- for (int i = 0; i < dir.length; i++) {
- if (!dir[i].isDir() && acceptFile(dir[i])) {
- numAvroFiles++;
- }
- }
- } else {
- numAvroFiles = 1;
- }
- return super.createInputSplits(numAvroFiles);
- }
-
- // --------------------------------------------------------------------------------------------
- // Concrete subclasses of ListValue and MapValue for all possible primitive types
- // --------------------------------------------------------------------------------------------
-
- public static class StringListValue extends ListValue<StringValue> {
- private static final long serialVersionUID = 1L;
- }
-
- public static class IntListValue extends ListValue<IntValue> {
- private static final long serialVersionUID = 1L;
- }
-
- public static class BooleanListValue extends ListValue<BooleanValue> {
- private static final long serialVersionUID = 1L;
- }
-
- public static class DoubleListValue extends ListValue<DoubleValue> {
- private static final long serialVersionUID = 1L;
- }
-
- public static class FloatListValue extends ListValue<FloatValue> {
- private static final long serialVersionUID = 1L;
- }
-
- public static class LongListValue extends ListValue<LongValue> {
- private static final long serialVersionUID = 1L;
- }
-
- public static class StringMapValue extends MapValue<StringValue, StringValue> {
- private static final long serialVersionUID = 1L;
- }
-
- public static class IntMapValue extends MapValue<StringValue, IntValue> {
- private static final long serialVersionUID = 1L;
- }
-
- public static class BooleanMapValue extends MapValue<StringValue, BooleanValue> {
- private static final long serialVersionUID = 1L;
- }
-
- public static class DoubleMapValue extends MapValue<StringValue, DoubleValue> {
- private static final long serialVersionUID = 1L;
- }
-
- public static class FloatMapValue extends MapValue<StringValue, FloatValue> {
- private static final long serialVersionUID = 1L;
- }
-
- public static class LongMapValue extends MapValue<StringValue, LongValue> {
- private static final long serialVersionUID = 1L;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/626d6b78/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/example/ReflectiveAvroTypeExample.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/example/ReflectiveAvroTypeExample.java b/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/example/ReflectiveAvroTypeExample.java
deleted file mode 100644
index d6d1213..0000000
--- a/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/example/ReflectiveAvroTypeExample.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.record.io.avro.example;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Iterator;
-import java.util.Random;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.io.OutputFormat;
-import org.apache.flink.api.java.record.functions.MapFunction;
-import org.apache.flink.api.java.record.functions.ReduceFunction;
-import org.apache.flink.api.java.record.io.GenericInputFormat;
-import org.apache.flink.api.java.record.operators.GenericDataSink;
-import org.apache.flink.api.java.record.operators.GenericDataSource;
-import org.apache.flink.api.java.record.operators.MapOperator;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
-import org.apache.flink.client.LocalExecutor;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
-import org.apache.flink.util.Collector;
-
-
-public class ReflectiveAvroTypeExample {
-
-
- public static void main(String[] args) throws Exception {
-
- GenericDataSource<UserGeneratingInputFormat> source = new GenericDataSource<UserGeneratingInputFormat>(UserGeneratingInputFormat.class);
-
- MapOperator mapper = MapOperator.builder(new NumberExtractingMapper())
- .input(source).name("le mapper").build();
-
- ReduceOperator reducer = ReduceOperator.builder(new ConcatenatingReducer(), IntValue.class, 1)
- .input(mapper).name("le reducer").build();
-
- GenericDataSink sink = new GenericDataSink(PrintingOutputFormat.class, reducer);
-
- Plan p = new Plan(sink);
- p.setDefaultParallelism(4);
-
- LocalExecutor.execute(p);
- }
-
-
- public static final class NumberExtractingMapper extends MapFunction implements Serializable {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void map(Record record, Collector<Record> out) throws Exception {
- User u = record.getField(0, SUser.class).datum();
- record.setField(1, new IntValue(u.getFavoriteNumber()));
- out.collect(record);
- }
- }
-
-
- public static final class ConcatenatingReducer extends ReduceFunction implements Serializable {
- private static final long serialVersionUID = 1L;
-
- private final Record result = new Record(2);
-
- @Override
- public void reduce(Iterator<Record> records, Collector<Record> out) throws Exception {
- Record r = records.next();
-
- int num = r.getField(1, IntValue.class).getValue();
- String names = r.getField(0, SUser.class).datum().getFavoriteColor().toString();
-
- while (records.hasNext()) {
- r = records.next();
- names += " - " + r.getField(0, SUser.class).datum().getFavoriteColor().toString();
- }
-
- result.setField(0, new IntValue(num));
- result.setField(1, new StringValue(names));
- out.collect(result);
- }
-
- }
-
-
- public static final class UserGeneratingInputFormat extends GenericInputFormat {
-
- private static final long serialVersionUID = 1L;
-
- private static final int NUM = 100;
-
- private final Random rnd = new Random(32498562304986L);
-
- private static final String[] NAMES = { "Peter", "Bob", "Liddy", "Alexander", "Stan" };
-
- private static final String[] COLORS = { "mauve", "crimson", "copper", "sky", "grass" };
-
- private int count;
-
-
- @Override
- public boolean reachedEnd() throws IOException {
- return count >= NUM;
- }
-
- @Override
- public Record nextRecord(Record record) throws IOException {
- count++;
-
- User u = new User();
- u.setName(NAMES[rnd.nextInt(NAMES.length)]);
- u.setFavoriteColor(COLORS[rnd.nextInt(COLORS.length)]);
- u.setFavoriteNumber(rnd.nextInt(87));
-
- SUser su = new SUser();
- su.datum(u);
-
- record.setField(0, su);
- return record;
- }
- }
-
- public static final class PrintingOutputFormat implements OutputFormat<Record> {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public void configure(Configuration parameters) {}
-
- @Override
- public void open(int taskNumber, int numTasks) throws IOException {}
-
- @Override
- public void writeRecord(Record record) throws IOException {
- int color = record.getField(0, IntValue.class).getValue();
- String names = record.getField(1, StringValue.class).getValue();
-
- System.out.println(color + ": " + names);
- }
-
- @Override
- public void close() throws IOException {}
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/626d6b78/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/example/SUser.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/example/SUser.java b/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/example/SUser.java
deleted file mode 100644
index 542639e..0000000
--- a/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/example/SUser.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.record.io.avro.example;
-
-import org.apache.flink.api.avro.AvroBaseValue;
-
-public class SUser extends AvroBaseValue<User> {
- private static final long serialVersionUID = 1L;
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/626d6b78/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/example/User.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/example/User.java b/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/example/User.java
deleted file mode 100644
index d4fb292..0000000
--- a/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/example/User.java
+++ /dev/null
@@ -1,269 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-/**
- * Autogenerated by Avro
- *
- * DO NOT EDIT DIRECTLY
- */
-package org.apache.flink.api.java.record.io.avro.example;
-@SuppressWarnings("all")
-@org.apache.avro.specific.AvroGenerated
-public class User extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
- public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"org.apache.flink.api.avro.example\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"favorite_number\",\"type\":[\"int\",\"null\"]},{\"name\":\"favorite_color\",\"type\":[\"string\",\"null\"]}]}");
- public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
- @Deprecated public java.lang.CharSequence name;
- @Deprecated public java.lang.Integer favorite_number;
- @Deprecated public java.lang.CharSequence favorite_color;
-
- /**
- * Default constructor. Note that this does not initialize fields
- * to their default values from the schema. If that is desired then
- * one should use {@link \#newBuilder()}.
- */
- public User() {}
-
- /**
- * All-args constructor.
- */
- public User(java.lang.CharSequence name, java.lang.Integer favorite_number, java.lang.CharSequence favorite_color) {
- this.name = name;
- this.favorite_number = favorite_number;
- this.favorite_color = favorite_color;
- }
-
- public org.apache.avro.Schema getSchema() { return SCHEMA$; }
- // Used by DatumWriter. Applications should not call.
- public java.lang.Object get(int field$) {
- switch (field$) {
- case 0: return name;
- case 1: return favorite_number;
- case 2: return favorite_color;
- default: throw new org.apache.avro.AvroRuntimeException("Bad index");
- }
- }
- // Used by DatumReader. Applications should not call.
- @SuppressWarnings(value="unchecked")
- public void put(int field$, java.lang.Object value$) {
- switch (field$) {
- case 0: name = (java.lang.CharSequence)value$; break;
- case 1: favorite_number = (java.lang.Integer)value$; break;
- case 2: favorite_color = (java.lang.CharSequence)value$; break;
- default: throw new org.apache.avro.AvroRuntimeException("Bad index");
- }
- }
-
- /**
- * Gets the value of the 'name' field.
- */
- public java.lang.CharSequence getName() {
- return name;
- }
-
- /**
- * Sets the value of the 'name' field.
- * @param value the value to set.
- */
- public void setName(java.lang.CharSequence value) {
- this.name = value;
- }
-
- /**
- * Gets the value of the 'favorite_number' field.
- */
- public java.lang.Integer getFavoriteNumber() {
- return favorite_number;
- }
-
- /**
- * Sets the value of the 'favorite_number' field.
- * @param value the value to set.
- */
- public void setFavoriteNumber(java.lang.Integer value) {
- this.favorite_number = value;
- }
-
- /**
- * Gets the value of the 'favorite_color' field.
- */
- public java.lang.CharSequence getFavoriteColor() {
- return favorite_color;
- }
-
- /**
- * Sets the value of the 'favorite_color' field.
- * @param value the value to set.
- */
- public void setFavoriteColor(java.lang.CharSequence value) {
- this.favorite_color = value;
- }
-
- /** Creates a new User RecordBuilder */
- public static org.apache.flink.api.java.record.io.avro.example.User.Builder newBuilder() {
- return new org.apache.flink.api.java.record.io.avro.example.User.Builder();
- }
-
- /** Creates a new User RecordBuilder by copying an existing Builder */
- public static org.apache.flink.api.java.record.io.avro.example.User.Builder newBuilder(org.apache.flink.api.java.record.io.avro.example.User.Builder other) {
- return new org.apache.flink.api.java.record.io.avro.example.User.Builder(other);
- }
-
- /** Creates a new User RecordBuilder by copying an existing User instance */
- public static org.apache.flink.api.java.record.io.avro.example.User.Builder newBuilder(org.apache.flink.api.java.record.io.avro.example.User other) {
- return new org.apache.flink.api.java.record.io.avro.example.User.Builder(other);
- }
-
- /**
- * RecordBuilder for User instances.
- */
- public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase<User>
- implements org.apache.avro.data.RecordBuilder<User> {
-
- private java.lang.CharSequence name;
- private java.lang.Integer favorite_number;
- private java.lang.CharSequence favorite_color;
-
- /** Creates a new Builder */
- private Builder() {
- super(org.apache.flink.api.java.record.io.avro.example.User.SCHEMA$);
- }
-
- /** Creates a Builder by copying an existing Builder */
- private Builder(org.apache.flink.api.java.record.io.avro.example.User.Builder other) {
- super(other);
- if (isValidValue(fields()[0], other.name)) {
- this.name = data().deepCopy(fields()[0].schema(), other.name);
- fieldSetFlags()[0] = true;
- }
- if (isValidValue(fields()[1], other.favorite_number)) {
- this.favorite_number = data().deepCopy(fields()[1].schema(), other.favorite_number);
- fieldSetFlags()[1] = true;
- }
- if (isValidValue(fields()[2], other.favorite_color)) {
- this.favorite_color = data().deepCopy(fields()[2].schema(), other.favorite_color);
- fieldSetFlags()[2] = true;
- }
- }
-
- /** Creates a Builder by copying an existing User instance */
- private Builder(org.apache.flink.api.java.record.io.avro.example.User other) {
- super(org.apache.flink.api.java.record.io.avro.example.User.SCHEMA$);
- if (isValidValue(fields()[0], other.name)) {
- this.name = data().deepCopy(fields()[0].schema(), other.name);
- fieldSetFlags()[0] = true;
- }
- if (isValidValue(fields()[1], other.favorite_number)) {
- this.favorite_number = data().deepCopy(fields()[1].schema(), other.favorite_number);
- fieldSetFlags()[1] = true;
- }
- if (isValidValue(fields()[2], other.favorite_color)) {
- this.favorite_color = data().deepCopy(fields()[2].schema(), other.favorite_color);
- fieldSetFlags()[2] = true;
- }
- }
-
- /** Gets the value of the 'name' field */
- public java.lang.CharSequence getName() {
- return name;
- }
-
- /** Sets the value of the 'name' field */
- public org.apache.flink.api.java.record.io.avro.example.User.Builder setName(java.lang.CharSequence value) {
- validate(fields()[0], value);
- this.name = value;
- fieldSetFlags()[0] = true;
- return this;
- }
-
- /** Checks whether the 'name' field has been set */
- public boolean hasName() {
- return fieldSetFlags()[0];
- }
-
- /** Clears the value of the 'name' field */
- public org.apache.flink.api.java.record.io.avro.example.User.Builder clearName() {
- name = null;
- fieldSetFlags()[0] = false;
- return this;
- }
-
- /** Gets the value of the 'favorite_number' field */
- public java.lang.Integer getFavoriteNumber() {
- return favorite_number;
- }
-
- /** Sets the value of the 'favorite_number' field */
- public org.apache.flink.api.java.record.io.avro.example.User.Builder setFavoriteNumber(java.lang.Integer value) {
- validate(fields()[1], value);
- this.favorite_number = value;
- fieldSetFlags()[1] = true;
- return this;
- }
-
- /** Checks whether the 'favorite_number' field has been set */
- public boolean hasFavoriteNumber() {
- return fieldSetFlags()[1];
- }
-
- /** Clears the value of the 'favorite_number' field */
- public org.apache.flink.api.java.record.io.avro.example.User.Builder clearFavoriteNumber() {
- favorite_number = null;
- fieldSetFlags()[1] = false;
- return this;
- }
-
- /** Gets the value of the 'favorite_color' field */
- public java.lang.CharSequence getFavoriteColor() {
- return favorite_color;
- }
-
- /** Sets the value of the 'favorite_color' field */
- public org.apache.flink.api.java.record.io.avro.example.User.Builder setFavoriteColor(java.lang.CharSequence value) {
- validate(fields()[2], value);
- this.favorite_color = value;
- fieldSetFlags()[2] = true;
- return this;
- }
-
- /** Checks whether the 'favorite_color' field has been set */
- public boolean hasFavoriteColor() {
- return fieldSetFlags()[2];
- }
-
- /** Clears the value of the 'favorite_color' field */
- public org.apache.flink.api.java.record.io.avro.example.User.Builder clearFavoriteColor() {
- favorite_color = null;
- fieldSetFlags()[2] = false;
- return this;
- }
-
- @Override
- public User build() {
- try {
- User record = new User();
- record.name = fieldSetFlags()[0] ? this.name : (java.lang.CharSequence) defaultValue(fields()[0]);
- record.favorite_number = fieldSetFlags()[1] ? this.favorite_number : (java.lang.Integer) defaultValue(fields()[1]);
- record.favorite_color = fieldSetFlags()[2] ? this.favorite_color : (java.lang.CharSequence) defaultValue(fields()[2]);
- return record;
- } catch (Exception e) {
- throw new org.apache.avro.AvroRuntimeException(e);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/626d6b78/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatTest.java b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatTest.java
index a60787d..db235c0 100644
--- a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatTest.java
+++ b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatTest.java
@@ -28,11 +28,11 @@ import org.apache.avro.file.DataFileReader;
import org.apache.avro.io.DatumReader;
import org.apache.avro.reflect.ReflectDatumReader;
import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.flink.api.io.avro.example.User;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.io.AvroOutputFormat;
-import org.apache.flink.api.java.record.io.avro.example.User;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.test.util.JavaProgramTestBase;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/626d6b78/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroWithEmptyArrayITCase.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroWithEmptyArrayITCase.java b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroWithEmptyArrayITCase.java
deleted file mode 100644
index a3850e2..0000000
--- a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroWithEmptyArrayITCase.java
+++ /dev/null
@@ -1,217 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.avro;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.avro.reflect.Nullable;
-import org.apache.flink.api.avro.AvroBaseValue;
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.io.OutputFormat;
-import org.apache.flink.api.java.record.functions.CoGroupFunction;
-import org.apache.flink.api.java.record.io.GenericInputFormat;
-import org.apache.flink.api.java.record.operators.CoGroupOperator;
-import org.apache.flink.api.java.record.operators.GenericDataSink;
-import org.apache.flink.api.java.record.operators.GenericDataSource;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.test.util.RecordAPITestBase;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
-import org.apache.flink.util.Collector;
-
-public class AvroWithEmptyArrayITCase extends RecordAPITestBase {
-
- @Override
- protected Plan getTestJob() {
- GenericDataSource<RandomInputFormat> bookSource = new GenericDataSource<RandomInputFormat>(
- new RandomInputFormat(true));
- GenericDataSource<RandomInputFormat> authorSource = new GenericDataSource<RandomInputFormat>(
- new RandomInputFormat(false));
-
- CoGroupOperator coGroupOperator = CoGroupOperator.builder(MyCoGrouper.class, LongValue.class, 0, 0)
- .input1(bookSource).input2(authorSource).name("CoGrouper Test").build();
-
- GenericDataSink sink = new GenericDataSink(PrintingOutputFormat.class, coGroupOperator);
-
- Plan plan = new Plan(sink, "CoGroper Test Plan");
- plan.setDefaultParallelism(1);
- return plan;
- }
-
- public static class SBookAvroValue extends AvroBaseValue<Book> {
- private static final long serialVersionUID = 1L;
-
- public SBookAvroValue() {}
-
- public SBookAvroValue(Book datum) {
- super(datum);
- }
- }
-
- public static class Book {
-
- long bookId;
- @Nullable
- String title;
- long authorId;
-
- public Book() {
- }
-
- public Book(long bookId, String title, long authorId) {
- this.bookId = bookId;
- this.title = title;
- this.authorId = authorId;
- }
- }
-
- public static class SBookAuthorValue extends AvroBaseValue<BookAuthor> {
- private static final long serialVersionUID = 1L;
-
- public SBookAuthorValue() {}
-
- public SBookAuthorValue(BookAuthor datum) {
- super(datum);
- }
- }
-
- public static class BookAuthor {
-
- enum BookType {
- book,
- article,
- journal
- }
-
- long authorId;
-
- @Nullable
- List<String> bookTitles;
-
- @Nullable
- List<Book> books;
-
- String authorName;
-
- BookType bookType;
-
- public BookAuthor() {}
-
- public BookAuthor(long authorId, List<String> bookTitles, String authorName) {
- this.authorId = authorId;
- this.bookTitles = bookTitles;
- this.authorName = authorName;
- }
- }
-
- public static class RandomInputFormat extends GenericInputFormat {
- private static final long serialVersionUID = 1L;
-
- private final boolean isBook;
-
- private boolean touched = false;
-
- public RandomInputFormat(boolean isBook) {
- this.isBook = isBook;
- }
-
- @Override
- public boolean reachedEnd() throws IOException {
- return touched;
- }
-
- @Override
- public Record nextRecord(Record record) throws IOException {
- touched = true;
- record.setField(0, new LongValue(26382648));
-
- if (isBook) {
- Book b = new Book(123, "This is a test book", 26382648);
- record.setField(1, new SBookAvroValue(b));
- } else {
- List<String> titles = new ArrayList<String>();
- // titles.add("Title1");
- // titles.add("Title2");
- // titles.add("Title3");
-
- List<Book> books = new ArrayList<Book>();
- books.add(new Book(123, "This is a test book", 1));
- books.add(new Book(24234234, "This is a test book", 1));
- books.add(new Book(1234324, "This is a test book", 3));
-
- BookAuthor a = new BookAuthor(1, titles, "Test Author");
- a.books = books;
- a.bookType = BookAuthor.BookType.journal;
- record.setField(1, new SBookAuthorValue(a));
- }
-
- return record;
- }
- }
-
- public static final class PrintingOutputFormat implements OutputFormat<Record> {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public void configure(Configuration parameters) {}
-
- @Override
- public void open(int taskNumber, int numTasks) {}
-
- @Override
- public void writeRecord(Record record) throws IOException {
- long key = record.getField(0, LongValue.class).getValue();
- String val = record.getField(1, StringValue.class).getValue();
- System.out.println(key + " : " + val);
- }
-
- @Override
- public void close() {}
- }
-
- public static class MyCoGrouper extends CoGroupFunction {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void coGroup(Iterator<Record> records1, Iterator<Record> records2, Collector<Record> out) {
-
- Record r1 = null;
- while (records1.hasNext()) {
- r1 = records1.next();
- }
- Record r2 = null;
- while (records2.hasNext()) {
- r2 = records2.next();
- }
-
- if (r1 != null) {
- r1.getField(1, SBookAvroValue.class).datum();
- }
-
- if (r2 != null) {
- r2.getField(1, SBookAuthorValue.class).datum();
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/626d6b78/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java
index 78ff2f1..74b5397 100644
--- a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java
+++ b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java
@@ -31,8 +31,8 @@ import java.util.Random;
import org.apache.avro.reflect.ReflectDatumReader;
import org.apache.avro.reflect.ReflectDatumWriter;
-import org.apache.flink.api.java.record.io.avro.generated.Colors;
-import org.apache.flink.api.java.record.io.avro.generated.User;
+import org.apache.flink.api.io.avro.generated.Colors;
+import org.apache.flink.api.io.avro.generated.User;
import org.apache.flink.util.StringUtils;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/626d6b78/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java
index 31e083f..25e2e0c 100644
--- a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java
+++ b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java
@@ -39,7 +39,6 @@ import org.apache.avro.file.DataFileWriter;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.reflect.ReflectData;
import org.apache.avro.reflect.ReflectDatumWriter;
-import org.apache.flink.api.avro.AvroBaseValue;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.functions.RichReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
@@ -125,18 +124,6 @@ public class AvroExternalJarProgram {
}
}
-
- public static final class SUser extends AvroBaseValue<MyUser> {
-
- static final long serialVersionUID = 1L;
-
- public SUser() {}
-
- public SUser(MyUser u) {
- super(u);
- }
- }
-
// --------------------------------------------------------------------------------------------
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/626d6b78/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java
new file mode 100644
index 0000000..d8d8b46
--- /dev/null
+++ b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.io.avro;
+
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.avro.util.Utf8;
+import org.apache.flink.api.io.avro.generated.Colors;
+import org.apache.flink.api.io.avro.generated.User;
+import org.apache.flink.api.java.io.AvroInputFormat;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test the avro input format.
+ * (The testcase is mostly the getting started tutorial of avro)
+ * http://avro.apache.org/docs/current/gettingstartedjava.html
+ */
+public class AvroRecordInputFormatTest {
+
+ private File testFile;
+
+ final static String TEST_NAME = "Alyssa";
+
+ final static String TEST_ARRAY_STRING_1 = "ELEMENT 1";
+ final static String TEST_ARRAY_STRING_2 = "ELEMENT 2";
+
+ final static boolean TEST_ARRAY_BOOLEAN_1 = true;
+ final static boolean TEST_ARRAY_BOOLEAN_2 = false;
+
+ final static Colors TEST_ENUM_COLOR = Colors.GREEN;
+
+ final static String TEST_MAP_KEY1 = "KEY 1";
+ final static long TEST_MAP_VALUE1 = 8546456L;
+ final static String TEST_MAP_KEY2 = "KEY 2";
+ final static long TEST_MAP_VALUE2 = 17554L;
+
+ @Before
+ public void createFiles() throws IOException {
+ testFile = File.createTempFile("AvroInputFormatTest", null);
+
+ ArrayList<CharSequence> stringArray = new ArrayList<CharSequence>();
+ stringArray.add(TEST_ARRAY_STRING_1);
+ stringArray.add(TEST_ARRAY_STRING_2);
+
+ ArrayList<Boolean> booleanArray = new ArrayList<Boolean>();
+ booleanArray.add(TEST_ARRAY_BOOLEAN_1);
+ booleanArray.add(TEST_ARRAY_BOOLEAN_2);
+
+ HashMap<CharSequence, Long> longMap = new HashMap<CharSequence, Long>();
+ longMap.put(TEST_MAP_KEY1, TEST_MAP_VALUE1);
+ longMap.put(TEST_MAP_KEY2, TEST_MAP_VALUE2);
+
+
+ User user1 = new User();
+ user1.setName(TEST_NAME);
+ user1.setFavoriteNumber(256);
+ user1.setTypeDoubleTest(123.45d);
+ user1.setTypeBoolTest(true);
+ user1.setTypeArrayString(stringArray);
+ user1.setTypeArrayBoolean(booleanArray);
+ user1.setTypeEnum(TEST_ENUM_COLOR);
+ user1.setTypeMap(longMap);
+
+ // Construct via builder
+ User user2 = User.newBuilder()
+ .setName("Charlie")
+ .setFavoriteColor("blue")
+ .setFavoriteNumber(null)
+ .setTypeBoolTest(false)
+ .setTypeDoubleTest(1.337d)
+ .setTypeNullTest(null)
+ .setTypeLongTest(1337L)
+ .setTypeArrayString(new ArrayList<CharSequence>())
+ .setTypeArrayBoolean(new ArrayList<Boolean>())
+ .setTypeNullableArray(null)
+ .setTypeEnum(Colors.RED)
+ .setTypeMap(new HashMap<CharSequence, Long>())
+ .build();
+ DatumWriter<User> userDatumWriter = new SpecificDatumWriter<User>(User.class);
+ DataFileWriter<User> dataFileWriter = new DataFileWriter<User>(userDatumWriter);
+ dataFileWriter.create(user1.getSchema(), testFile);
+ dataFileWriter.append(user1);
+ dataFileWriter.append(user2);
+ dataFileWriter.close();
+ }
+
+ @Test
+ public void testDeserialisation() throws IOException {
+ Configuration parameters = new Configuration();
+
+ AvroInputFormat<User> format = new AvroInputFormat<User>(new Path(testFile.getAbsolutePath()), User.class);
+
+ format.configure(parameters);
+ FileInputSplit[] splits = format.createInputSplits(1);
+ assertEquals(splits.length, 1);
+ format.open(splits[0]);
+
+ User u = format.nextRecord(null);
+ assertNotNull(u);
+
+ String name = u.getName().toString();
+ assertNotNull("empty record", name);
+ assertEquals("name not equal", TEST_NAME, name);
+
+ // check arrays
+ List<CharSequence> sl = u.getTypeArrayString();
+ assertEquals("element 0 not equal", TEST_ARRAY_STRING_1, sl.get(0).toString());
+ assertEquals("element 1 not equal", TEST_ARRAY_STRING_2, sl.get(1).toString());
+
+ List<Boolean> bl = u.getTypeArrayBoolean();
+ assertEquals("element 0 not equal", TEST_ARRAY_BOOLEAN_1, bl.get(0));
+ assertEquals("element 1 not equal", TEST_ARRAY_BOOLEAN_2, bl.get(1));
+
+ // check enums
+ Colors enumValue = u.getTypeEnum();
+ assertEquals("enum not equal", TEST_ENUM_COLOR, enumValue);
+
+ // check maps
+ Map<CharSequence, Long> lm = u.getTypeMap();
+ assertEquals("map value of key 1 not equal", TEST_MAP_VALUE1, lm.get(new Utf8(TEST_MAP_KEY1)).longValue());
+ assertEquals("map value of key 2 not equal", TEST_MAP_VALUE2, lm.get(new Utf8(TEST_MAP_KEY2)).longValue());
+
+ assertFalse("expecting second element", format.reachedEnd());
+ assertNotNull("expecting second element", format.nextRecord(u));
+
+ assertNull(format.nextRecord(u));
+ assertTrue(format.reachedEnd());
+
+ format.close();
+ }
+
+ @After
+ public void deleteFiles() {
+ testFile.delete();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/626d6b78/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/Colors.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/Colors.java b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/Colors.java
new file mode 100644
index 0000000..58e1f5c
--- /dev/null
+++ b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/Colors.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+/**
+ * Autogenerated by Avro
+ *
+ * DO NOT EDIT DIRECTLY
+ */
+package org.apache.flink.api.io.avro.generated;
+@SuppressWarnings("all")
+@org.apache.avro.specific.AvroGenerated
+public enum Colors {
+ RED, GREEN, BLUE ;
+ public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"enum\",\"name\":\"Colors\",\"namespace\":\"org.apache.flink.api.io.avro.generated\",\"symbols\":[\"RED\",\"GREEN\",\"BLUE\"]}");
+ public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
+}