You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/07/12 14:48:50 UTC
[71/73] [abbrv] prefix all projects in addons and quickstarts with
flink-
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
----------------------------------------------------------------------
diff --git a/flink-addons/avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java b/flink-addons/avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
deleted file mode 100644
index 4a6e7f1..0000000
--- a/flink-addons/avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.avro;
-
-import java.io.File;
-import java.net.InetSocketAddress;
-
-import org.apache.flink.client.minicluster.NepheleMiniCluster;
-import org.apache.flink.client.program.Client;
-import org.apache.flink.client.program.PackagedProgram;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.util.LogUtils;
-import org.junit.Assert;
-import org.junit.Test;
-
-
-public class AvroExternalJarProgramITCase {
-
- private static final int TEST_JM_PORT = 43191;
-
- private static final String JAR_FILE = "target/maven-test-jar.jar";
-
- private static final String TEST_DATA_FILE = "/testdata.avro";
-
- static {
- LogUtils.initializeDefaultTestConsoleLogger();
- }
-
- @Test
- public void testExternalProgram() {
-
- NepheleMiniCluster testMiniCluster = null;
-
- try {
- testMiniCluster = new NepheleMiniCluster();
- testMiniCluster.setJobManagerRpcPort(TEST_JM_PORT);
- testMiniCluster.setTaskManagerNumSlots(4);
- testMiniCluster.start();
-
- String jarFile = JAR_FILE;
- String testData = getClass().getResource(TEST_DATA_FILE).toString();
-
- PackagedProgram program = new PackagedProgram(new File(jarFile), new String[] { testData });
-
- Client c = new Client(new InetSocketAddress("localhost", TEST_JM_PORT), new Configuration());
- c.run(program, 4, true);
- }
- catch (Throwable t) {
- System.err.println(t.getMessage());
- t.printStackTrace();
- Assert.fail("Error during the packaged program execution: " + t.getMessage());
- }
- finally {
- if (testMiniCluster != null) {
- try {
- testMiniCluster.stop();
- } catch (Throwable t) {}
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatTest.java b/flink-addons/avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatTest.java
deleted file mode 100644
index 637a5e9..0000000
--- a/flink-addons/avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatTest.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.avro;
-
-import org.junit.Assert;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.avro.file.DataFileReader;
-import org.apache.avro.io.DatumReader;
-import org.apache.avro.reflect.ReflectDatumReader;
-import org.apache.avro.specific.SpecificDatumReader;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.functions.MapFunction;
-import org.apache.flink.api.java.io.AvroOutputFormat;
-import org.apache.flink.api.java.record.io.avro.example.User;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-@SuppressWarnings("serial")
-public class AvroOutputFormatTest extends JavaProgramTestBase {
-
- public static String outputPath1;
-
- public static String outputPath2;
-
- public static String inputPath;
-
- public static String userData = "alice|1|blue\n" +
- "bob|2|red\n" +
- "john|3|yellow\n" +
- "walt|4|black\n";
-
- @Override
- protected void preSubmit() throws Exception {
- inputPath = createTempFile("user", userData);
- outputPath1 = getTempDirPath("avro_output1");
- outputPath2 = getTempDirPath("avro_output2");
- }
-
-
- @Override
- protected void testProgram() throws Exception {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple3<String, Integer, String>> input = env.readCsvFile(inputPath)
- .fieldDelimiter('|')
- .types(String.class, Integer.class, String.class);
-
- //output the data with AvroOutputFormat for specific user type
- DataSet<User> specificUser = input.map(new ConvertToUser());
- specificUser.write(new AvroOutputFormat<User>(User.class), outputPath1);
-
- //output the data with AvroOutputFormat for reflect user type
- DataSet<ReflectiveUser> reflectiveUser = specificUser.map(new ConvertToReflective());
- reflectiveUser.write(new AvroOutputFormat<ReflectiveUser>(ReflectiveUser.class), outputPath2);
-
- env.execute();
- }
-
- @Override
- protected void postSubmit() throws Exception {
- //compare result for specific user type
- File [] output1;
- File file1 = asFile(outputPath1);
- if (file1.isDirectory()) {
- output1 = file1.listFiles();
- } else {
- output1 = new File[] {file1};
- }
- List<String> result1 = new ArrayList<String>();
- DatumReader<User> userDatumReader1 = new SpecificDatumReader<User>(User.class);
- for (File avroOutput : output1) {
- DataFileReader<User> dataFileReader1 = new DataFileReader<User>(avroOutput, userDatumReader1);
- while (dataFileReader1.hasNext()) {
- User user = dataFileReader1.next();
- result1.add(user.getName() + "|" + user.getFavoriteNumber() + "|" + user.getFavoriteColor());
- }
- }
- for (String expectedResult : userData.split("\n")) {
- Assert.assertTrue("expected user " + expectedResult + " not found.", result1.contains(expectedResult));
- }
-
- //compare result for reflect user type
- File [] output2;
- File file2 = asFile(outputPath2);
- if (file2.isDirectory()) {
- output2 = file2.listFiles();
- } else {
- output2 = new File[] {file2};
- }
- List<String> result2 = new ArrayList<String>();
- DatumReader<ReflectiveUser> userDatumReader2 = new ReflectDatumReader<ReflectiveUser>(ReflectiveUser.class);
- for (File avroOutput : output2) {
- DataFileReader<ReflectiveUser> dataFileReader2 = new DataFileReader<ReflectiveUser>(avroOutput, userDatumReader2);
- while (dataFileReader2.hasNext()) {
- ReflectiveUser user = dataFileReader2.next();
- result2.add(user.getName() + "|" + user.getFavoriteNumber() + "|" + user.getFavoriteColor());
- }
- }
- for (String expectedResult : userData.split("\n")) {
- Assert.assertTrue("expected user " + expectedResult + " not found.", result2.contains(expectedResult));
- }
-
-
- }
-
-
- public final static class ConvertToUser extends MapFunction<Tuple3<String, Integer, String>, User> {
-
- @Override
- public User map(Tuple3<String, Integer, String> value) throws Exception {
- return new User(value.f0, value.f1, value.f2);
- }
- }
-
- public final static class ConvertToReflective extends MapFunction<User, ReflectiveUser> {
-
- @Override
- public ReflectiveUser map(User value) throws Exception {
- return new ReflectiveUser(value.getName().toString(), value.getFavoriteNumber(), value.getFavoriteColor().toString());
- }
- }
-
-
- public static class ReflectiveUser {
- private String name;
- private int favoriteNumber;
- private String favoriteColor;
-
- public ReflectiveUser() {}
-
- public ReflectiveUser(String name, int favoriteNumber, String favoriteColor) {
- this.name = name;
- this.favoriteNumber = favoriteNumber;
- this.favoriteColor = favoriteColor;
- }
-
- public String getName() {
- return this.name;
- }
- public String getFavoriteColor() {
- return this.favoriteColor;
- }
- public int getFavoriteNumber() {
- return this.favoriteNumber;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/avro/src/test/java/org/apache/flink/api/avro/AvroWithEmptyArrayITCase.java
----------------------------------------------------------------------
diff --git a/flink-addons/avro/src/test/java/org/apache/flink/api/avro/AvroWithEmptyArrayITCase.java b/flink-addons/avro/src/test/java/org/apache/flink/api/avro/AvroWithEmptyArrayITCase.java
deleted file mode 100644
index ea9edff..0000000
--- a/flink-addons/avro/src/test/java/org/apache/flink/api/avro/AvroWithEmptyArrayITCase.java
+++ /dev/null
@@ -1,218 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.avro;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.avro.reflect.Nullable;
-import org.apache.flink.api.avro.AvroBaseValue;
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.io.OutputFormat;
-import org.apache.flink.api.java.record.functions.CoGroupFunction;
-import org.apache.flink.api.java.record.io.GenericInputFormat;
-import org.apache.flink.api.java.record.operators.CoGroupOperator;
-import org.apache.flink.api.java.record.operators.GenericDataSink;
-import org.apache.flink.api.java.record.operators.GenericDataSource;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.test.util.RecordAPITestBase;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
-import org.apache.flink.util.Collector;
-
-public class AvroWithEmptyArrayITCase extends RecordAPITestBase {
-
- @Override
- protected Plan getTestJob() {
- GenericDataSource<RandomInputFormat> bookSource = new GenericDataSource<RandomInputFormat>(
- new RandomInputFormat(true));
- GenericDataSource<RandomInputFormat> authorSource = new GenericDataSource<RandomInputFormat>(
- new RandomInputFormat(false));
-
- CoGroupOperator coGroupOperator = CoGroupOperator.builder(MyCoGrouper.class, LongValue.class, 0, 0)
- .input1(bookSource).input2(authorSource).name("CoGrouper Test").build();
-
- GenericDataSink sink = new GenericDataSink(PrintingOutputFormat.class, coGroupOperator);
-
- Plan plan = new Plan(sink, "CoGroper Test Plan");
- plan.setDefaultParallelism(1);
- return plan;
- }
-
- public static class SBookAvroValue extends AvroBaseValue<Book> {
- private static final long serialVersionUID = 1L;
-
- public SBookAvroValue() {}
-
- public SBookAvroValue(Book datum) {
- super(datum);
- }
- }
-
- public static class Book {
-
- long bookId;
- @Nullable
- String title;
- long authorId;
-
- public Book() {
- }
-
- public Book(long bookId, String title, long authorId) {
- this.bookId = bookId;
- this.title = title;
- this.authorId = authorId;
- }
- }
-
- public static class SBookAuthorValue extends AvroBaseValue<BookAuthor> {
- private static final long serialVersionUID = 1L;
-
- public SBookAuthorValue() {}
-
- public SBookAuthorValue(BookAuthor datum) {
- super(datum);
- }
- }
-
- public static class BookAuthor {
-
- enum BookType {
- book,
- article,
- journal
- }
-
- long authorId;
-
- @Nullable
- List<String> bookTitles;
-
- @Nullable
- List<Book> books;
-
- String authorName;
-
- BookType bookType;
-
- public BookAuthor() {}
-
- public BookAuthor(long authorId, List<String> bookTitles, String authorName) {
- this.authorId = authorId;
- this.bookTitles = bookTitles;
- this.authorName = authorName;
- }
- }
-
- public static class RandomInputFormat extends GenericInputFormat {
- private static final long serialVersionUID = 1L;
-
- private final boolean isBook;
-
- private boolean touched = false;
-
- public RandomInputFormat(boolean isBook) {
- this.isBook = isBook;
- }
-
- @Override
- public boolean reachedEnd() throws IOException {
- return touched;
- }
-
- @Override
- public Record nextRecord(Record record) throws IOException {
- touched = true;
- record.setField(0, new LongValue(26382648));
-
- if (isBook) {
- Book b = new Book(123, "This is a test book", 26382648);
- record.setField(1, new SBookAvroValue(b));
- } else {
- List<String> titles = new ArrayList<String>();
- // titles.add("Title1");
- // titles.add("Title2");
- // titles.add("Title3");
-
- List<Book> books = new ArrayList<Book>();
- books.add(new Book(123, "This is a test book", 1));
- books.add(new Book(24234234, "This is a test book", 1));
- books.add(new Book(1234324, "This is a test book", 3));
-
- BookAuthor a = new BookAuthor(1, titles, "Test Author");
- a.books = books;
- a.bookType = BookAuthor.BookType.journal;
- record.setField(1, new SBookAuthorValue(a));
- }
-
- return record;
- }
- }
-
- public static final class PrintingOutputFormat implements OutputFormat<Record> {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public void configure(Configuration parameters) {}
-
- @Override
- public void open(int taskNumber, int numTasks) {}
-
- @Override
- public void writeRecord(Record record) throws IOException {
- long key = record.getField(0, LongValue.class).getValue();
- String val = record.getField(1, StringValue.class).getValue();
- System.out.println(key + " : " + val);
- }
-
- @Override
- public void close() {}
- }
-
- public static class MyCoGrouper extends CoGroupFunction {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void coGroup(Iterator<Record> records1, Iterator<Record> records2, Collector<Record> out)
- throws Exception {
-
- Record r1 = null;
- if (records1.hasNext()) {
- r1 = records1.next();
- }
- Record r2 = null;
- if (records2.hasNext()) {
- r2 = records2.next();
- }
-
- if (r1 != null) {
- r1.getField(1, SBookAvroValue.class).datum();
- }
-
- if (r2 != null) {
- r2.getField(1, SBookAuthorValue.class).datum();
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java b/flink-addons/avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java
deleted file mode 100644
index 76b23ef..0000000
--- a/flink-addons/avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java
+++ /dev/null
@@ -1,523 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.avro;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-
-import org.apache.avro.reflect.ReflectDatumReader;
-import org.apache.avro.reflect.ReflectDatumWriter;
-import org.apache.flink.api.avro.DataInputDecoder;
-import org.apache.flink.api.avro.DataOutputEncoder;
-import org.apache.flink.api.java.record.io.avro.generated.Colors;
-import org.apache.flink.api.java.record.io.avro.generated.User;
-import org.apache.flink.util.StringUtils;
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-
-
-/**
- * Tests the {@link DataOutputEncoder} and {@link DataInputDecoder} classes for Avro serialization.
- */
-public class EncoderDecoderTest {
-
- @Test
- public void testComplexStringsDirecty() {
- try {
- Random rnd = new Random(349712539451944123L);
-
- for (int i = 0; i < 10; i++) {
- String testString = StringUtils.getRandomString(rnd, 10, 100);
-
- ByteArrayOutputStream baos = new ByteArrayOutputStream(512);
- {
- DataOutputStream dataOut = new DataOutputStream(baos);
- DataOutputEncoder encoder = new DataOutputEncoder();
- encoder.setOut(dataOut);
-
- encoder.writeString(testString);
- dataOut.flush();
- dataOut.close();
- }
-
- byte[] data = baos.toByteArray();
-
- // deserialize
- {
- ByteArrayInputStream bais = new ByteArrayInputStream(data);
- DataInputStream dataIn = new DataInputStream(bais);
- DataInputDecoder decoder = new DataInputDecoder();
- decoder.setIn(dataIn);
-
- String deserialized = decoder.readString();
-
- assertEquals(testString, deserialized);
- }
- }
- }
- catch (Exception e) {
- System.err.println(e.getMessage());
- e.printStackTrace();
- fail("Test failed due to an exception: " + e.getMessage());
- }
- }
-
- @Test
- public void testPrimitiveTypes() {
-
- testObjectSerialization(new Boolean(true));
- testObjectSerialization(new Boolean(false));
-
- testObjectSerialization(new Byte((byte) 0));
- testObjectSerialization(new Byte((byte) 1));
- testObjectSerialization(new Byte((byte) -1));
- testObjectSerialization(new Byte(Byte.MIN_VALUE));
- testObjectSerialization(new Byte(Byte.MAX_VALUE));
-
- testObjectSerialization(new Short((short) 0));
- testObjectSerialization(new Short((short) 1));
- testObjectSerialization(new Short((short) -1));
- testObjectSerialization(new Short(Short.MIN_VALUE));
- testObjectSerialization(new Short(Short.MAX_VALUE));
-
- testObjectSerialization(new Integer(0));
- testObjectSerialization(new Integer(1));
- testObjectSerialization(new Integer(-1));
- testObjectSerialization(new Integer(Integer.MIN_VALUE));
- testObjectSerialization(new Integer(Integer.MAX_VALUE));
-
- testObjectSerialization(new Long(0));
- testObjectSerialization(new Long(1));
- testObjectSerialization(new Long(-1));
- testObjectSerialization(new Long(Long.MIN_VALUE));
- testObjectSerialization(new Long(Long.MAX_VALUE));
-
- testObjectSerialization(new Float(0));
- testObjectSerialization(new Float(1));
- testObjectSerialization(new Float(-1));
- testObjectSerialization(new Float((float)Math.E));
- testObjectSerialization(new Float((float)Math.PI));
- testObjectSerialization(new Float(Float.MIN_VALUE));
- testObjectSerialization(new Float(Float.MAX_VALUE));
- testObjectSerialization(new Float(Float.MIN_NORMAL));
- testObjectSerialization(new Float(Float.NaN));
- testObjectSerialization(new Float(Float.NEGATIVE_INFINITY));
- testObjectSerialization(new Float(Float.POSITIVE_INFINITY));
-
- testObjectSerialization(new Double(0));
- testObjectSerialization(new Double(1));
- testObjectSerialization(new Double(-1));
- testObjectSerialization(new Double(Math.E));
- testObjectSerialization(new Double(Math.PI));
- testObjectSerialization(new Double(Double.MIN_VALUE));
- testObjectSerialization(new Double(Double.MAX_VALUE));
- testObjectSerialization(new Double(Double.MIN_NORMAL));
- testObjectSerialization(new Double(Double.NaN));
- testObjectSerialization(new Double(Double.NEGATIVE_INFINITY));
- testObjectSerialization(new Double(Double.POSITIVE_INFINITY));
-
- testObjectSerialization("");
- testObjectSerialization("abcdefg");
- testObjectSerialization("ab\u1535\u0155xyz\u706F");
-
- testObjectSerialization(new SimpleTypes(3637, 54876486548L, (byte) 65, "We're out looking for astronauts", (short) 0x2387, 2.65767523));
- testObjectSerialization(new SimpleTypes(705608724, -1L, (byte) -65, "Serve me the sky with a big slice of lemon", (short) Byte.MIN_VALUE, 0.0000001));
- }
-
- @Test
- public void testArrayTypes() {
- {
- int[] array = new int[] {1, 2, 3, 4, 5};
- testObjectSerialization(array);
- }
- {
- long[] array = new long[] {1, 2, 3, 4, 5};
- testObjectSerialization(array);
- }
- {
- float[] array = new float[] {1, 2, 3, 4, 5};
- testObjectSerialization(array);
- }
- {
- double[] array = new double[] {1, 2, 3, 4, 5};
- testObjectSerialization(array);
- }
- {
- String[] array = new String[] {"Oh", "my", "what", "do", "we", "have", "here", "?"};
- testObjectSerialization(array);
- }
- }
-
- @Test
- public void testEmptyArray() {
- {
- int[] array = new int[0];
- testObjectSerialization(array);
- }
- {
- long[] array = new long[0];
- testObjectSerialization(array);
- }
- {
- float[] array = new float[0];
- testObjectSerialization(array);
- }
- {
- double[] array = new double[0];
- testObjectSerialization(array);
- }
- {
- String[] array = new String[0];
- testObjectSerialization(array);
- }
- }
-
- @Test
- public void testObjects() {
- // simple object containing only primitives
- {
- testObjectSerialization(new Book(976243875L, "The Serialization Odysse", 42));
- }
-
- // object with collection
- {
- ArrayList<String> list = new ArrayList<String>();
- list.add("A");
- list.add("B");
- list.add("C");
- list.add("D");
- list.add("E");
-
- testObjectSerialization(new BookAuthor(976243875L, list, "Arno Nym"));
- }
-
- // object with empty collection
- {
- ArrayList<String> list = new ArrayList<String>();
- testObjectSerialization(new BookAuthor(987654321L, list, "The Saurus"));
- }
- }
-
- @Test
- public void testNestedObjectsWithCollections() {
- testObjectSerialization(new ComplexNestedObject2(true));
- }
-
- @Test
- public void testGeneratedObjectWithNullableFields() {
- List<CharSequence> strings = Arrays.asList(new CharSequence[] { "These", "strings", "should", "be", "recognizable", "as", "a", "meaningful", "sequence" });
- List<Boolean> bools = Arrays.asList(true, true, false, false, true, false, true, true);
- Map<CharSequence, Long> map = new HashMap<CharSequence, Long>();
- map.put("1", 1L);
- map.put("2", 2L);
- map.put("3", 3L);
-
- User user = new User("Freudenreich", 1337, "macintosh gray", 1234567890L, 3.1415926, null, true, strings, bools, null, Colors.GREEN, map);
-
- testObjectSerialization(user);
- }
-
- @Test
- public void testVarLenCountEncoding() {
- try {
- long[] values = new long[] { 0, 1, 2, 3, 4, 0, 574, 45236, 0, 234623462, 23462462346L, 0, 9734028767869761L, 0x7fffffffffffffffL};
-
- // write
- ByteArrayOutputStream baos = new ByteArrayOutputStream(512);
- {
- DataOutputStream dataOut = new DataOutputStream(baos);
-
- for (long val : values) {
- DataOutputEncoder.writeVarLongCount(dataOut, val);
- }
-
- dataOut.flush();
- dataOut.close();
- }
-
- // read
- {
- ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
- DataInputStream dataIn = new DataInputStream(bais);
-
- for (long val : values) {
- long read = DataInputDecoder.readVarLongCount(dataIn);
- assertEquals("Wrong var-len encoded value read.", val, read);
- }
- }
- }
- catch (Exception e) {
- System.err.println(e.getMessage());
- e.printStackTrace();
- fail("Test failed due to an exception: " + e.getMessage());
- }
- }
-
- private static <X> void testObjectSerialization(X obj) {
-
- try {
-
- // serialize
- ByteArrayOutputStream baos = new ByteArrayOutputStream(512);
- {
- DataOutputStream dataOut = new DataOutputStream(baos);
- DataOutputEncoder encoder = new DataOutputEncoder();
- encoder.setOut(dataOut);
-
- @SuppressWarnings("unchecked")
- Class<X> clazz = (Class<X>) obj.getClass();
- ReflectDatumWriter<X> writer = new ReflectDatumWriter<X>(clazz);
-
- writer.write(obj, encoder);
- dataOut.flush();
- dataOut.close();
- }
-
- byte[] data = baos.toByteArray();
- X result = null;
-
- // deserialize
- {
- ByteArrayInputStream bais = new ByteArrayInputStream(data);
- DataInputStream dataIn = new DataInputStream(bais);
- DataInputDecoder decoder = new DataInputDecoder();
- decoder.setIn(dataIn);
-
- @SuppressWarnings("unchecked")
- Class<X> clazz = (Class<X>) obj.getClass();
- ReflectDatumReader<X> reader = new ReflectDatumReader<X>(clazz);
-
- // create a reuse object if possible, otherwise we have no reuse object
- X reuse = null;
- try {
- @SuppressWarnings("unchecked")
- X test = (X) obj.getClass().newInstance();
- reuse = test;
- } catch (Throwable t) {}
-
- result = reader.read(reuse, decoder);
- }
-
- // check
- final String message = "Deserialized object is not the same as the original";
-
- if (obj.getClass().isArray()) {
- Class<?> clazz = obj.getClass();
- if (clazz == byte[].class) {
- assertArrayEquals(message, (byte[]) obj, (byte[]) result);
- }
- else if (clazz == short[].class) {
- assertArrayEquals(message, (short[]) obj, (short[]) result);
- }
- else if (clazz == int[].class) {
- assertArrayEquals(message, (int[]) obj, (int[]) result);
- }
- else if (clazz == long[].class) {
- assertArrayEquals(message, (long[]) obj, (long[]) result);
- }
- else if (clazz == char[].class) {
- assertArrayEquals(message, (char[]) obj, (char[]) result);
- }
- else if (clazz == float[].class) {
- assertArrayEquals(message, (float[]) obj, (float[]) result, 0.0f);
- }
- else if (clazz == double[].class) {
- assertArrayEquals(message, (double[]) obj, (double[]) result, 0.0);
- } else {
- assertArrayEquals(message, (Object[]) obj, (Object[]) result);
- }
- } else {
- assertEquals(message, obj, result);
- }
- }
- catch (Exception e) {
- System.err.println(e.getMessage());
- e.printStackTrace();
- fail("Test failed due to an exception: " + e.getMessage());
- }
- }
-
- // --------------------------------------------------------------------------------------------
- // Test Objects
- // --------------------------------------------------------------------------------------------
-
-
- public static final class SimpleTypes {
-
- private final int iVal;
- private final long lVal;
- private final byte bVal;
- private final String sVal;
- private final short rVal;
- private final double dVal;
-
-
- public SimpleTypes() {
- this(0, 0, (byte) 0, "", (short) 0, 0);
- }
-
- public SimpleTypes(int iVal, long lVal, byte bVal, String sVal, short rVal, double dVal) {
- this.iVal = iVal;
- this.lVal = lVal;
- this.bVal = bVal;
- this.sVal = sVal;
- this.rVal = rVal;
- this.dVal = dVal;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj.getClass() == SimpleTypes.class) {
- SimpleTypes other = (SimpleTypes) obj;
-
- return other.iVal == this.iVal &&
- other.lVal == this.lVal &&
- other.bVal == this.bVal &&
- other.sVal.equals(this.sVal) &&
- other.rVal == this.rVal &&
- other.dVal == this.dVal;
-
- } else {
- return false;
- }
- }
- }
-
- public static class ComplexNestedObject1 {
-
- private double doubleValue;
-
- private List<String> stringList;
-
- public ComplexNestedObject1() {}
-
- public ComplexNestedObject1(int offInit) {
- this.doubleValue = 6293485.6723 + offInit;
-
- this.stringList = new ArrayList<String>();
- this.stringList.add("A" + offInit);
- this.stringList.add("somewhat" + offInit);
- this.stringList.add("random" + offInit);
- this.stringList.add("collection" + offInit);
- this.stringList.add("of" + offInit);
- this.stringList.add("strings" + offInit);
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj.getClass() == ComplexNestedObject1.class) {
- ComplexNestedObject1 other = (ComplexNestedObject1) obj;
- return other.doubleValue == this.doubleValue && this.stringList.equals(other.stringList);
- } else {
- return false;
- }
- }
- }
-
- public static class ComplexNestedObject2 {
-
- private long longValue;
-
- private Map<String, ComplexNestedObject1> theMap;
-
- public ComplexNestedObject2() {}
-
- public ComplexNestedObject2(boolean init) {
- this.longValue = 46547;
-
- this.theMap = new HashMap<String, ComplexNestedObject1>();
- this.theMap.put("36354L", new ComplexNestedObject1(43546543));
- this.theMap.put("785611L", new ComplexNestedObject1(45784568));
- this.theMap.put("43L", new ComplexNestedObject1(9876543));
- this.theMap.put("-45687L", new ComplexNestedObject1(7897615));
- this.theMap.put("1919876876896L", new ComplexNestedObject1(27154));
- this.theMap.put("-868468468L", new ComplexNestedObject1(546435));
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj.getClass() == ComplexNestedObject2.class) {
- ComplexNestedObject2 other = (ComplexNestedObject2) obj;
- return other.longValue == this.longValue && this.theMap.equals(other.theMap);
- } else {
- return false;
- }
- }
- }
-
- public static class Book {
-
- private long bookId;
- private String title;
- private long authorId;
-
- public Book() {}
-
- public Book(long bookId, String title, long authorId) {
- this.bookId = bookId;
- this.title = title;
- this.authorId = authorId;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj.getClass() == Book.class) {
- Book other = (Book) obj;
- return other.bookId == this.bookId && other.authorId == this.authorId && this.title.equals(other.title);
- } else {
- return false;
- }
- }
- }
-
- public static class BookAuthor {
-
- private long authorId;
- private List<String> bookTitles;
- private String authorName;
-
- public BookAuthor() {}
-
- public BookAuthor(long authorId, List<String> bookTitles, String authorName) {
- this.authorId = authorId;
- this.bookTitles = bookTitles;
- this.authorName = authorName;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj.getClass() == BookAuthor.class) {
- BookAuthor other = (BookAuthor) obj;
- return other.authorName.equals(this.authorName) && other.authorId == this.authorId &&
- other.bookTitles.equals(this.bookTitles);
- } else {
- return false;
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java
----------------------------------------------------------------------
diff --git a/flink-addons/avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java b/flink-addons/avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java
deleted file mode 100644
index 146c72b..0000000
--- a/flink-addons/avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java
+++ /dev/null
@@ -1,232 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.avro.testjar;
-
-// ================================================================================================
-// This file defines the classes for the AvroExternalJarProgramITCase.
-// The program is exported into src/test/resources/AvroTestProgram.jar.
-//
-// THIS FILE MUST STAY FULLY COMMENTED SUCH THAT THE HERE DEFINED CLASSES ARE NOT COMPILED
-// AND ADDED TO THE test-classes DIRECTORY. OTHERWISE, THE EXTERNAL CLASS LOADING WILL
-// NOT BE COVERED BY THIS TEST.
-// ================================================================================================
-
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-
-import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.io.DatumWriter;
-import org.apache.avro.reflect.ReflectData;
-import org.apache.avro.reflect.ReflectDatumWriter;
-import org.apache.flink.api.avro.AvroBaseValue;
-import org.apache.flink.api.java.functions.MapFunction;
-import org.apache.flink.api.java.functions.ReduceFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.AvroInputFormat;
-import org.apache.flink.api.java.io.DiscardingOuputFormat;
-import org.apache.flink.core.fs.Path;
-
-public class AvroExternalJarProgram {
-
- public static final class Color {
-
- private String name;
- private double saturation;
-
- public Color() {
- name = "";
- saturation = 1.0;
- }
-
- public Color(String name, double saturation) {
- this.name = name;
- this.saturation = saturation;
- }
-
- public String getName() {
- return name;
- }
-
- public void setName(String name) {
- this.name = name;
- }
-
- public double getSaturation() {
- return saturation;
- }
-
- public void setSaturation(double saturation) {
- this.saturation = saturation;
- }
-
- @Override
- public String toString() {
- return name + '(' + saturation + ')';
- }
- }
-
- public static final class MyUser {
-
- private String name;
- private List<Color> colors;
-
- public MyUser() {
- name = "unknown";
- colors = new ArrayList<Color>();
- }
-
- public MyUser(String name, List<Color> colors) {
- this.name = name;
- this.colors = colors;
- }
-
- public String getName() {
- return name;
- }
-
- public List<Color> getColors() {
- return colors;
- }
-
- public void setName(String name) {
- this.name = name;
- }
-
- public void setColors(List<Color> colors) {
- this.colors = colors;
- }
-
- @Override
- public String toString() {
- return name + " : " + colors;
- }
- }
-
-
- public static final class SUser extends AvroBaseValue<MyUser> {
-
- static final long serialVersionUID = 1L;
-
- public SUser() {}
-
- public SUser(MyUser u) {
- super(u);
- }
- }
-
- // --------------------------------------------------------------------------------------------
-
- // --------------------------------------------------------------------------------------------
-
- public static final class NameExtractor extends MapFunction<MyUser, Tuple2<String, MyUser>> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Tuple2<String, MyUser> map(MyUser u) {
- String namePrefix = u.getName().substring(0, 1);
- return new Tuple2<String, MyUser>(namePrefix, u);
- }
- }
-
- public static final class NameGrouper extends ReduceFunction<Tuple2<String, MyUser>> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Tuple2<String, MyUser> reduce(Tuple2<String, MyUser> val1, Tuple2<String, MyUser> val2) {
- return val1;
- }
- }
-
- // --------------------------------------------------------------------------------------------
- // Test Data
- // --------------------------------------------------------------------------------------------
-
- public static final class Generator {
-
- private final Random rnd = new Random(2389756789345689276L);
-
- public MyUser nextUser() {
- return randomUser();
- }
-
- private MyUser randomUser() {
-
- int numColors = rnd.nextInt(5);
- ArrayList<Color> colors = new ArrayList<Color>(numColors);
- for (int i = 0; i < numColors; i++) {
- colors.add(new Color(randomString(), rnd.nextDouble()));
- }
-
- return new MyUser(randomString(), colors);
- }
-
- private String randomString() {
- char[] c = new char[this.rnd.nextInt(20) + 5];
-
- for (int i = 0; i < c.length; i++) {
- c[i] = (char) (this.rnd.nextInt(150) + 40);
- }
-
- return new String(c);
- }
- }
-
- public static void writeTestData(File testFile, int numRecords) throws IOException {
-
- DatumWriter<MyUser> userDatumWriter = new ReflectDatumWriter<MyUser>(MyUser.class);
- DataFileWriter<MyUser> dataFileWriter = new DataFileWriter<MyUser>(userDatumWriter);
-
- dataFileWriter.create(ReflectData.get().getSchema(MyUser.class), testFile);
-
-
- Generator generator = new Generator();
-
- for (int i = 0; i < numRecords; i++) {
- MyUser user = generator.nextUser();
- dataFileWriter.append(user);
- }
-
- dataFileWriter.close();
- }
-
-// public static void main(String[] args) throws Exception {
-// String testDataFile = new File("src/test/resources/testdata.avro").getAbsolutePath();
-// writeTestData(new File(testDataFile), 50);
-// }
-
- public static void main(String[] args) throws Exception {
- String inputPath = args[0];
-
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<MyUser> input = env.createInput(new AvroInputFormat<MyUser>(new Path(inputPath), MyUser.class));
-
- DataSet<Tuple2<String, MyUser>> result = input.map(new NameExtractor()).groupBy(0).reduce(new NameGrouper());
-
- result.output(new DiscardingOuputFormat<Tuple2<String,MyUser>>());
- env.execute();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/avro/src/test/java/org/apache/flink/api/java/io/AvroInputFormatTypeExtractionTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/avro/src/test/java/org/apache/flink/api/java/io/AvroInputFormatTypeExtractionTest.java b/flink-addons/avro/src/test/java/org/apache/flink/api/java/io/AvroInputFormatTypeExtractionTest.java
deleted file mode 100644
index aa08006..0000000
--- a/flink-addons/avro/src/test/java/org/apache/flink/api/java/io/AvroInputFormatTypeExtractionTest.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.io;
-
-import org.junit.Assert;
-import org.junit.Test;
-import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.AvroInputFormat;
-import org.apache.flink.api.java.typeutils.PojoTypeInfo;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.types.TypeInformation;
-
-public class AvroInputFormatTypeExtractionTest {
-
- @Test
- public void testTypeExtraction() {
- try {
- InputFormat<MyAvroType, ?> format = new AvroInputFormat<MyAvroType>(new Path("file:///ignore/this/file"), MyAvroType.class);
-
- TypeInformation<?> typeInfoDirect = TypeExtractor.getInputFormatTypes(format);
-
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- DataSet<MyAvroType> input = env.createInput(format);
- TypeInformation<?> typeInfoDataSet = input.getType();
-
-
- Assert.assertTrue(typeInfoDirect instanceof PojoTypeInfo);
- Assert.assertTrue(typeInfoDataSet instanceof PojoTypeInfo);
-
- Assert.assertEquals(MyAvroType.class, typeInfoDirect.getTypeClass());
- Assert.assertEquals(MyAvroType.class, typeInfoDataSet.getTypeClass());
- }
- catch (Exception e) {
- e.printStackTrace();
- Assert.fail(e.getMessage());
- }
- }
-
- public static final class MyAvroType {
-
- public String theString;
-
- private double aDouble;
-
- public double getaDouble() {
- return aDouble;
- }
-
- public void setaDouble(double aDouble) {
- this.aDouble = aDouble;
- }
-
- public void setTheString(String theString) {
- this.theString = theString;
- }
-
- public String getTheString() {
- return theString;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/avro/src/test/java/org/apache/flink/api/java/record/io/avro/AvroRecordInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/avro/src/test/java/org/apache/flink/api/java/record/io/avro/AvroRecordInputFormatTest.java b/flink-addons/avro/src/test/java/org/apache/flink/api/java/record/io/avro/AvroRecordInputFormatTest.java
deleted file mode 100644
index 2387fd6..0000000
--- a/flink-addons/avro/src/test/java/org/apache/flink/api/java/record/io/avro/AvroRecordInputFormatTest.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.record.io.avro;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-
-import junit.framework.Assert;
-
-import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.io.DatumWriter;
-import org.apache.avro.specific.SpecificDatumWriter;
-import org.apache.flink.api.java.record.io.avro.AvroRecordInputFormat;
-import org.apache.flink.api.java.record.io.avro.AvroRecordInputFormat.BooleanListValue;
-import org.apache.flink.api.java.record.io.avro.AvroRecordInputFormat.LongMapValue;
-import org.apache.flink.api.java.record.io.avro.AvroRecordInputFormat.StringListValue;
-import org.apache.flink.api.java.record.io.avro.generated.Colors;
-import org.apache.flink.api.java.record.io.avro.generated.User;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.FileInputSplit;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-
-/**
- * Test the avro input format.
- * (The testcase is mostly the getting started tutorial of avro)
- * http://avro.apache.org/docs/current/gettingstartedjava.html
- */
-public class AvroRecordInputFormatTest {
-
- private File testFile;
-
- private final AvroRecordInputFormat format = new AvroRecordInputFormat();
- final static String TEST_NAME = "Alyssa";
-
- final static String TEST_ARRAY_STRING_1 = "ELEMENT 1";
- final static String TEST_ARRAY_STRING_2 = "ELEMENT 2";
-
- final static boolean TEST_ARRAY_BOOLEAN_1 = true;
- final static boolean TEST_ARRAY_BOOLEAN_2 = false;
-
- final static Colors TEST_ENUM_COLOR = Colors.GREEN;
-
- final static CharSequence TEST_MAP_KEY1 = "KEY 1";
- final static long TEST_MAP_VALUE1 = 8546456L;
- final static CharSequence TEST_MAP_KEY2 = "KEY 2";
- final static long TEST_MAP_VALUE2 = 17554L;
-
-
- @Before
- public void createFiles() throws IOException {
- testFile = File.createTempFile("AvroInputFormatTest", null);
-
- ArrayList<CharSequence> stringArray = new ArrayList<CharSequence>();
- stringArray.add(TEST_ARRAY_STRING_1);
- stringArray.add(TEST_ARRAY_STRING_2);
-
- ArrayList<Boolean> booleanArray = new ArrayList<Boolean>();
- booleanArray.add(TEST_ARRAY_BOOLEAN_1);
- booleanArray.add(TEST_ARRAY_BOOLEAN_2);
-
- HashMap<CharSequence, Long> longMap = new HashMap<CharSequence, Long>();
- longMap.put(TEST_MAP_KEY1, TEST_MAP_VALUE1);
- longMap.put(TEST_MAP_KEY2, TEST_MAP_VALUE2);
-
-
- User user1 = new User();
- user1.setName(TEST_NAME);
- user1.setFavoriteNumber(256);
- user1.setTypeDoubleTest(123.45d);
- user1.setTypeBoolTest(true);
- user1.setTypeArrayString(stringArray);
- user1.setTypeArrayBoolean(booleanArray);
- user1.setTypeEnum(TEST_ENUM_COLOR);
- user1.setTypeMap(longMap);
-
- // Construct via builder
- User user2 = User.newBuilder()
- .setName("Charlie")
- .setFavoriteColor("blue")
- .setFavoriteNumber(null)
- .setTypeBoolTest(false)
- .setTypeDoubleTest(1.337d)
- .setTypeNullTest(null)
- .setTypeLongTest(1337L)
- .setTypeArrayString(new ArrayList<CharSequence>())
- .setTypeArrayBoolean(new ArrayList<Boolean>())
- .setTypeNullableArray(null)
- .setTypeEnum(Colors.RED)
- .setTypeMap(new HashMap<CharSequence, Long>())
- .build();
- DatumWriter<User> userDatumWriter = new SpecificDatumWriter<User>(User.class);
- DataFileWriter<User> dataFileWriter = new DataFileWriter<User>(userDatumWriter);
- dataFileWriter.create(user1.getSchema(), testFile);
- dataFileWriter.append(user1);
- dataFileWriter.append(user2);
- dataFileWriter.close();
- }
-
- @Test
- public void testDeserialisation() throws IOException {
- Configuration parameters = new Configuration();
- format.setFilePath(testFile.toURI().toString());
- format.configure(parameters);
- FileInputSplit[] splits = format.createInputSplits(1);
- Assert.assertEquals(splits.length, 1);
- format.open(splits[0]);
- Record record = new Record();
- Assert.assertNotNull(format.nextRecord(record));
- StringValue name = record.getField(0, StringValue.class);
- Assert.assertNotNull("empty record", name);
- Assert.assertEquals("name not equal",name.getValue(), TEST_NAME);
-
- // check arrays
- StringListValue sl = record.getField(7, AvroRecordInputFormat.StringListValue.class);
- Assert.assertEquals("element 0 not equal", sl.get(0).getValue(), TEST_ARRAY_STRING_1);
- Assert.assertEquals("element 1 not equal", sl.get(1).getValue(), TEST_ARRAY_STRING_2);
-
- BooleanListValue bl = record.getField(8, AvroRecordInputFormat.BooleanListValue.class);
- Assert.assertEquals("element 0 not equal", bl.get(0).getValue(), TEST_ARRAY_BOOLEAN_1);
- Assert.assertEquals("element 1 not equal", bl.get(1).getValue(), TEST_ARRAY_BOOLEAN_2);
-
- // check enums
- StringValue enumValue = record.getField(10, StringValue.class);
- Assert.assertEquals("string representation of enum not equal", enumValue.getValue(), TEST_ENUM_COLOR.toString());
-
- // check maps
- LongMapValue lm = record.getField(11, AvroRecordInputFormat.LongMapValue.class);
- Assert.assertEquals("map value of key 1 not equal", lm.get(new StringValue(TEST_MAP_KEY1)).getValue(), TEST_MAP_VALUE1);
- Assert.assertEquals("map value of key 2 not equal", lm.get(new StringValue(TEST_MAP_KEY2)).getValue(), TEST_MAP_VALUE2);
-
-
- Assert.assertFalse("expecting second element", format.reachedEnd());
- Assert.assertNotNull("expecting second element", format.nextRecord(record));
-
- Assert.assertNull(format.nextRecord(record));
- Assert.assertTrue(format.reachedEnd());
-
- format.close();
- }
-
- @After
- public void deleteFiles() {
- testFile.delete();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/avro/src/test/java/org/apache/flink/api/java/record/io/avro/generated/Colors.java
----------------------------------------------------------------------
diff --git a/flink-addons/avro/src/test/java/org/apache/flink/api/java/record/io/avro/generated/Colors.java b/flink-addons/avro/src/test/java/org/apache/flink/api/java/record/io/avro/generated/Colors.java
deleted file mode 100644
index 4cacb7f..0000000
--- a/flink-addons/avro/src/test/java/org/apache/flink/api/java/record/io/avro/generated/Colors.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-/**
- * Autogenerated by Avro
- *
- * DO NOT EDIT DIRECTLY
- */
-package org.apache.flink.api.java.record.io.avro.generated;
-@SuppressWarnings("all")
-@org.apache.avro.specific.AvroGenerated
-public enum Colors {
- RED, GREEN, BLUE ;
- public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"enum\",\"name\":\"Colors\",\"namespace\":\"org.apache.flink.api.java.record.io.avro.generated\",\"symbols\":[\"RED\",\"GREEN\",\"BLUE\"]}");
- public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/avro/src/test/java/org/apache/flink/api/java/record/io/avro/generated/User.java
----------------------------------------------------------------------
diff --git a/flink-addons/avro/src/test/java/org/apache/flink/api/java/record/io/avro/generated/User.java b/flink-addons/avro/src/test/java/org/apache/flink/api/java/record/io/avro/generated/User.java
deleted file mode 100644
index 61bbe41..0000000
--- a/flink-addons/avro/src/test/java/org/apache/flink/api/java/record/io/avro/generated/User.java
+++ /dev/null
@@ -1,755 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-/**
- * Autogenerated by Avro
- *
- * DO NOT EDIT DIRECTLY
- */
-package org.apache.flink.api.java.record.io.avro.generated;
-@SuppressWarnings("all")
-@org.apache.avro.specific.AvroGenerated
-public class User extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
- public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"org.apache.flink.api.java.record.io.avro.generated\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"favorite_number\",\"type\":[\"int\",\"null\"]},{\"name\":\"favorite_color\",\"type\":[\"string\",\"null\"]},{\"name\":\"type_long_test\",\"type\":[\"long\",\"null\"]},{\"name\":\"type_double_test\",\"type\":[\"double\"]},{\"name\":\"type_null_test\",\"type\":[\"null\"]},{\"name\":\"type_bool_test\",\"type\":[\"boolean\"]},{\"name\":\"type_array_string\",\"type\":{\"type\":\"array\",\"items\":\"string\"}},{\"name\":\"type_array_boolean\",\"type\":{\"type\":\"array\",\"items\":\"boolean\"}},{\"name\":\"type_nullable_array\",\"type\":[\"null\",{\"type\":\"array\",\"items\":\"string\"}],\"default\":null},{\"name\":\"type_enum\",\"type\":{\"type\":\"enum\",\"name\":\"Colors\",\"symbols\":[\"RED\",\"GREEN\",\"BLUE\"]}},{\"n
ame\":\"type_map\",\"type\":{\"type\":\"map\",\"values\":\"long\"}}]}");
- public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
- @Deprecated public java.lang.CharSequence name;
- @Deprecated public java.lang.Integer favorite_number;
- @Deprecated public java.lang.CharSequence favorite_color;
- @Deprecated public java.lang.Long type_long_test;
- @Deprecated public java.lang.Object type_double_test;
- @Deprecated public java.lang.Object type_null_test;
- @Deprecated public java.lang.Object type_bool_test;
- @Deprecated public java.util.List<java.lang.CharSequence> type_array_string;
- @Deprecated public java.util.List<java.lang.Boolean> type_array_boolean;
- @Deprecated public java.util.List<java.lang.CharSequence> type_nullable_array;
- @Deprecated public org.apache.flink.api.java.record.io.avro.generated.Colors type_enum;
- @Deprecated public java.util.Map<java.lang.CharSequence,java.lang.Long> type_map;
-
- /**
- * Default constructor. Note that this does not initialize fields
- * to their default values from the schema. If that is desired then
- * one should use {@link \#newBuilder()}.
- */
- public User() {}
-
- /**
- * All-args constructor.
- */
- public User(java.lang.CharSequence name, java.lang.Integer favorite_number, java.lang.CharSequence favorite_color, java.lang.Long type_long_test, java.lang.Object type_double_test, java.lang.Object type_null_test, java.lang.Object type_bool_test, java.util.List<java.lang.CharSequence> type_array_string, java.util.List<java.lang.Boolean> type_array_boolean, java.util.List<java.lang.CharSequence> type_nullable_array, org.apache.flink.api.java.record.io.avro.generated.Colors type_enum, java.util.Map<java.lang.CharSequence,java.lang.Long> type_map) {
- this.name = name;
- this.favorite_number = favorite_number;
- this.favorite_color = favorite_color;
- this.type_long_test = type_long_test;
- this.type_double_test = type_double_test;
- this.type_null_test = type_null_test;
- this.type_bool_test = type_bool_test;
- this.type_array_string = type_array_string;
- this.type_array_boolean = type_array_boolean;
- this.type_nullable_array = type_nullable_array;
- this.type_enum = type_enum;
- this.type_map = type_map;
- }
-
- public org.apache.avro.Schema getSchema() { return SCHEMA$; }
- // Used by DatumWriter. Applications should not call.
- public java.lang.Object get(int field$) {
- switch (field$) {
- case 0: return name;
- case 1: return favorite_number;
- case 2: return favorite_color;
- case 3: return type_long_test;
- case 4: return type_double_test;
- case 5: return type_null_test;
- case 6: return type_bool_test;
- case 7: return type_array_string;
- case 8: return type_array_boolean;
- case 9: return type_nullable_array;
- case 10: return type_enum;
- case 11: return type_map;
- default: throw new org.apache.avro.AvroRuntimeException("Bad index");
- }
- }
- // Used by DatumReader. Applications should not call.
- @SuppressWarnings(value="unchecked")
- public void put(int field$, java.lang.Object value$) {
- switch (field$) {
- case 0: name = (java.lang.CharSequence)value$; break;
- case 1: favorite_number = (java.lang.Integer)value$; break;
- case 2: favorite_color = (java.lang.CharSequence)value$; break;
- case 3: type_long_test = (java.lang.Long)value$; break;
- case 4: type_double_test = (java.lang.Object)value$; break;
- case 5: type_null_test = (java.lang.Object)value$; break;
- case 6: type_bool_test = (java.lang.Object)value$; break;
- case 7: type_array_string = (java.util.List<java.lang.CharSequence>)value$; break;
- case 8: type_array_boolean = (java.util.List<java.lang.Boolean>)value$; break;
- case 9: type_nullable_array = (java.util.List<java.lang.CharSequence>)value$; break;
- case 10: type_enum = (org.apache.flink.api.java.record.io.avro.generated.Colors)value$; break;
- case 11: type_map = (java.util.Map<java.lang.CharSequence,java.lang.Long>)value$; break;
- default: throw new org.apache.avro.AvroRuntimeException("Bad index");
- }
- }
-
- /**
- * Gets the value of the 'name' field.
- */
- public java.lang.CharSequence getName() {
- return name;
- }
-
- /**
- * Sets the value of the 'name' field.
- * @param value the value to set.
- */
- public void setName(java.lang.CharSequence value) {
- this.name = value;
- }
-
- /**
- * Gets the value of the 'favorite_number' field.
- */
- public java.lang.Integer getFavoriteNumber() {
- return favorite_number;
- }
-
- /**
- * Sets the value of the 'favorite_number' field.
- * @param value the value to set.
- */
- public void setFavoriteNumber(java.lang.Integer value) {
- this.favorite_number = value;
- }
-
- /**
- * Gets the value of the 'favorite_color' field.
- */
- public java.lang.CharSequence getFavoriteColor() {
- return favorite_color;
- }
-
- /**
- * Sets the value of the 'favorite_color' field.
- * @param value the value to set.
- */
- public void setFavoriteColor(java.lang.CharSequence value) {
- this.favorite_color = value;
- }
-
- /**
- * Gets the value of the 'type_long_test' field.
- */
- public java.lang.Long getTypeLongTest() {
- return type_long_test;
- }
-
- /**
- * Sets the value of the 'type_long_test' field.
- * @param value the value to set.
- */
- public void setTypeLongTest(java.lang.Long value) {
- this.type_long_test = value;
- }
-
- /**
- * Gets the value of the 'type_double_test' field.
- */
- public java.lang.Object getTypeDoubleTest() {
- return type_double_test;
- }
-
- /**
- * Sets the value of the 'type_double_test' field.
- * @param value the value to set.
- */
- public void setTypeDoubleTest(java.lang.Object value) {
- this.type_double_test = value;
- }
-
- /**
- * Gets the value of the 'type_null_test' field.
- */
- public java.lang.Object getTypeNullTest() {
- return type_null_test;
- }
-
- /**
- * Sets the value of the 'type_null_test' field.
- * @param value the value to set.
- */
- public void setTypeNullTest(java.lang.Object value) {
- this.type_null_test = value;
- }
-
- /**
- * Gets the value of the 'type_bool_test' field.
- */
- public java.lang.Object getTypeBoolTest() {
- return type_bool_test;
- }
-
- /**
- * Sets the value of the 'type_bool_test' field.
- * @param value the value to set.
- */
- public void setTypeBoolTest(java.lang.Object value) {
- this.type_bool_test = value;
- }
-
- /**
- * Gets the value of the 'type_array_string' field.
- */
- public java.util.List<java.lang.CharSequence> getTypeArrayString() {
- return type_array_string;
- }
-
- /**
- * Sets the value of the 'type_array_string' field.
- * @param value the value to set.
- */
- public void setTypeArrayString(java.util.List<java.lang.CharSequence> value) {
- this.type_array_string = value;
- }
-
- /**
- * Gets the value of the 'type_array_boolean' field.
- */
- public java.util.List<java.lang.Boolean> getTypeArrayBoolean() {
- return type_array_boolean;
- }
-
- /**
- * Sets the value of the 'type_array_boolean' field.
- * @param value the value to set.
- */
- public void setTypeArrayBoolean(java.util.List<java.lang.Boolean> value) {
- this.type_array_boolean = value;
- }
-
- /**
- * Gets the value of the 'type_nullable_array' field.
- */
- public java.util.List<java.lang.CharSequence> getTypeNullableArray() {
- return type_nullable_array;
- }
-
- /**
- * Sets the value of the 'type_nullable_array' field.
- * @param value the value to set.
- */
- public void setTypeNullableArray(java.util.List<java.lang.CharSequence> value) {
- this.type_nullable_array = value;
- }
-
- /**
- * Gets the value of the 'type_enum' field.
- */
- public org.apache.flink.api.java.record.io.avro.generated.Colors getTypeEnum() {
- return type_enum;
- }
-
- /**
- * Sets the value of the 'type_enum' field.
- * @param value the value to set.
- */
- public void setTypeEnum(org.apache.flink.api.java.record.io.avro.generated.Colors value) {
- this.type_enum = value;
- }
-
- /**
- * Gets the value of the 'type_map' field.
- */
- public java.util.Map<java.lang.CharSequence,java.lang.Long> getTypeMap() {
- return type_map;
- }
-
- /**
- * Sets the value of the 'type_map' field.
- * @param value the value to set.
- */
- public void setTypeMap(java.util.Map<java.lang.CharSequence,java.lang.Long> value) {
- this.type_map = value;
- }
-
- /** Creates a new User RecordBuilder */
- public static org.apache.flink.api.java.record.io.avro.generated.User.Builder newBuilder() {
- return new org.apache.flink.api.java.record.io.avro.generated.User.Builder();
- }
-
- /** Creates a new User RecordBuilder by copying an existing Builder */
- public static org.apache.flink.api.java.record.io.avro.generated.User.Builder newBuilder(org.apache.flink.api.java.record.io.avro.generated.User.Builder other) {
- return new org.apache.flink.api.java.record.io.avro.generated.User.Builder(other);
- }
-
- /** Creates a new User RecordBuilder by copying an existing User instance */
- public static org.apache.flink.api.java.record.io.avro.generated.User.Builder newBuilder(org.apache.flink.api.java.record.io.avro.generated.User other) {
- return new org.apache.flink.api.java.record.io.avro.generated.User.Builder(other);
- }
-
- /**
- * RecordBuilder for User instances.
- */
- public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase<User>
- implements org.apache.avro.data.RecordBuilder<User> {
-
- private java.lang.CharSequence name;
- private java.lang.Integer favorite_number;
- private java.lang.CharSequence favorite_color;
- private java.lang.Long type_long_test;
- private java.lang.Object type_double_test;
- private java.lang.Object type_null_test;
- private java.lang.Object type_bool_test;
- private java.util.List<java.lang.CharSequence> type_array_string;
- private java.util.List<java.lang.Boolean> type_array_boolean;
- private java.util.List<java.lang.CharSequence> type_nullable_array;
- private org.apache.flink.api.java.record.io.avro.generated.Colors type_enum;
- private java.util.Map<java.lang.CharSequence,java.lang.Long> type_map;
-
- /** Creates a new Builder */
- private Builder() {
- super(org.apache.flink.api.java.record.io.avro.generated.User.SCHEMA$);
- }
-
- /** Creates a Builder by copying an existing Builder */
- private Builder(org.apache.flink.api.java.record.io.avro.generated.User.Builder other) {
- super(other);
- if (isValidValue(fields()[0], other.name)) {
- this.name = data().deepCopy(fields()[0].schema(), other.name);
- fieldSetFlags()[0] = true;
- }
- if (isValidValue(fields()[1], other.favorite_number)) {
- this.favorite_number = data().deepCopy(fields()[1].schema(), other.favorite_number);
- fieldSetFlags()[1] = true;
- }
- if (isValidValue(fields()[2], other.favorite_color)) {
- this.favorite_color = data().deepCopy(fields()[2].schema(), other.favorite_color);
- fieldSetFlags()[2] = true;
- }
- if (isValidValue(fields()[3], other.type_long_test)) {
- this.type_long_test = data().deepCopy(fields()[3].schema(), other.type_long_test);
- fieldSetFlags()[3] = true;
- }
- if (isValidValue(fields()[4], other.type_double_test)) {
- this.type_double_test = data().deepCopy(fields()[4].schema(), other.type_double_test);
- fieldSetFlags()[4] = true;
- }
- if (isValidValue(fields()[5], other.type_null_test)) {
- this.type_null_test = data().deepCopy(fields()[5].schema(), other.type_null_test);
- fieldSetFlags()[5] = true;
- }
- if (isValidValue(fields()[6], other.type_bool_test)) {
- this.type_bool_test = data().deepCopy(fields()[6].schema(), other.type_bool_test);
- fieldSetFlags()[6] = true;
- }
- if (isValidValue(fields()[7], other.type_array_string)) {
- this.type_array_string = data().deepCopy(fields()[7].schema(), other.type_array_string);
- fieldSetFlags()[7] = true;
- }
- if (isValidValue(fields()[8], other.type_array_boolean)) {
- this.type_array_boolean = data().deepCopy(fields()[8].schema(), other.type_array_boolean);
- fieldSetFlags()[8] = true;
- }
- if (isValidValue(fields()[9], other.type_nullable_array)) {
- this.type_nullable_array = data().deepCopy(fields()[9].schema(), other.type_nullable_array);
- fieldSetFlags()[9] = true;
- }
- if (isValidValue(fields()[10], other.type_enum)) {
- this.type_enum = data().deepCopy(fields()[10].schema(), other.type_enum);
- fieldSetFlags()[10] = true;
- }
- if (isValidValue(fields()[11], other.type_map)) {
- this.type_map = data().deepCopy(fields()[11].schema(), other.type_map);
- fieldSetFlags()[11] = true;
- }
- }
-
- /** Creates a Builder by copying an existing User instance */
- private Builder(org.apache.flink.api.java.record.io.avro.generated.User other) {
- super(org.apache.flink.api.java.record.io.avro.generated.User.SCHEMA$);
- if (isValidValue(fields()[0], other.name)) {
- this.name = data().deepCopy(fields()[0].schema(), other.name);
- fieldSetFlags()[0] = true;
- }
- if (isValidValue(fields()[1], other.favorite_number)) {
- this.favorite_number = data().deepCopy(fields()[1].schema(), other.favorite_number);
- fieldSetFlags()[1] = true;
- }
- if (isValidValue(fields()[2], other.favorite_color)) {
- this.favorite_color = data().deepCopy(fields()[2].schema(), other.favorite_color);
- fieldSetFlags()[2] = true;
- }
- if (isValidValue(fields()[3], other.type_long_test)) {
- this.type_long_test = data().deepCopy(fields()[3].schema(), other.type_long_test);
- fieldSetFlags()[3] = true;
- }
- if (isValidValue(fields()[4], other.type_double_test)) {
- this.type_double_test = data().deepCopy(fields()[4].schema(), other.type_double_test);
- fieldSetFlags()[4] = true;
- }
- if (isValidValue(fields()[5], other.type_null_test)) {
- this.type_null_test = data().deepCopy(fields()[5].schema(), other.type_null_test);
- fieldSetFlags()[5] = true;
- }
- if (isValidValue(fields()[6], other.type_bool_test)) {
- this.type_bool_test = data().deepCopy(fields()[6].schema(), other.type_bool_test);
- fieldSetFlags()[6] = true;
- }
- if (isValidValue(fields()[7], other.type_array_string)) {
- this.type_array_string = data().deepCopy(fields()[7].schema(), other.type_array_string);
- fieldSetFlags()[7] = true;
- }
- if (isValidValue(fields()[8], other.type_array_boolean)) {
- this.type_array_boolean = data().deepCopy(fields()[8].schema(), other.type_array_boolean);
- fieldSetFlags()[8] = true;
- }
- if (isValidValue(fields()[9], other.type_nullable_array)) {
- this.type_nullable_array = data().deepCopy(fields()[9].schema(), other.type_nullable_array);
- fieldSetFlags()[9] = true;
- }
- if (isValidValue(fields()[10], other.type_enum)) {
- this.type_enum = data().deepCopy(fields()[10].schema(), other.type_enum);
- fieldSetFlags()[10] = true;
- }
- if (isValidValue(fields()[11], other.type_map)) {
- this.type_map = data().deepCopy(fields()[11].schema(), other.type_map);
- fieldSetFlags()[11] = true;
- }
- }
-
- /** Gets the value of the 'name' field */
- public java.lang.CharSequence getName() {
- return name;
- }
-
- /** Sets the value of the 'name' field */
- public org.apache.flink.api.java.record.io.avro.generated.User.Builder setName(java.lang.CharSequence value) {
- validate(fields()[0], value);
- this.name = value;
- fieldSetFlags()[0] = true;
- return this;
- }
-
- /** Checks whether the 'name' field has been set */
- public boolean hasName() {
- return fieldSetFlags()[0];
- }
-
- /** Clears the value of the 'name' field */
- public org.apache.flink.api.java.record.io.avro.generated.User.Builder clearName() {
- name = null;
- fieldSetFlags()[0] = false;
- return this;
- }
-
- /** Gets the value of the 'favorite_number' field */
- public java.lang.Integer getFavoriteNumber() {
- return favorite_number;
- }
-
- /** Sets the value of the 'favorite_number' field */
- public org.apache.flink.api.java.record.io.avro.generated.User.Builder setFavoriteNumber(java.lang.Integer value) {
- validate(fields()[1], value);
- this.favorite_number = value;
- fieldSetFlags()[1] = true;
- return this;
- }
-
- /** Checks whether the 'favorite_number' field has been set */
- public boolean hasFavoriteNumber() {
- return fieldSetFlags()[1];
- }
-
- /** Clears the value of the 'favorite_number' field */
- public org.apache.flink.api.java.record.io.avro.generated.User.Builder clearFavoriteNumber() {
- favorite_number = null;
- fieldSetFlags()[1] = false;
- return this;
- }
-
- /** Gets the value of the 'favorite_color' field */
- public java.lang.CharSequence getFavoriteColor() {
- return favorite_color;
- }
-
- /** Sets the value of the 'favorite_color' field */
- public org.apache.flink.api.java.record.io.avro.generated.User.Builder setFavoriteColor(java.lang.CharSequence value) {
- validate(fields()[2], value);
- this.favorite_color = value;
- fieldSetFlags()[2] = true;
- return this;
- }
-
- /** Checks whether the 'favorite_color' field has been set */
- public boolean hasFavoriteColor() {
- return fieldSetFlags()[2];
- }
-
- /** Clears the value of the 'favorite_color' field */
- public org.apache.flink.api.java.record.io.avro.generated.User.Builder clearFavoriteColor() {
- favorite_color = null;
- fieldSetFlags()[2] = false;
- return this;
- }
-
- /** Gets the value of the 'type_long_test' field */
- public java.lang.Long getTypeLongTest() {
- return type_long_test;
- }
-
- /** Sets the value of the 'type_long_test' field */
- public org.apache.flink.api.java.record.io.avro.generated.User.Builder setTypeLongTest(java.lang.Long value) {
- validate(fields()[3], value);
- this.type_long_test = value;
- fieldSetFlags()[3] = true;
- return this;
- }
-
- /** Checks whether the 'type_long_test' field has been set */
- public boolean hasTypeLongTest() {
- return fieldSetFlags()[3];
- }
-
- /** Clears the value of the 'type_long_test' field */
- public org.apache.flink.api.java.record.io.avro.generated.User.Builder clearTypeLongTest() {
- type_long_test = null;
- fieldSetFlags()[3] = false;
- return this;
- }
-
- /** Gets the value of the 'type_double_test' field */
- public java.lang.Object getTypeDoubleTest() {
- return type_double_test;
- }
-
- /** Sets the value of the 'type_double_test' field */
- public org.apache.flink.api.java.record.io.avro.generated.User.Builder setTypeDoubleTest(java.lang.Object value) {
- validate(fields()[4], value);
- this.type_double_test = value;
- fieldSetFlags()[4] = true;
- return this;
- }
-
- /** Checks whether the 'type_double_test' field has been set */
- public boolean hasTypeDoubleTest() {
- return fieldSetFlags()[4];
- }
-
- /** Clears the value of the 'type_double_test' field */
- public org.apache.flink.api.java.record.io.avro.generated.User.Builder clearTypeDoubleTest() {
- type_double_test = null;
- fieldSetFlags()[4] = false;
- return this;
- }
-
- /** Gets the value of the 'type_null_test' field */
- public java.lang.Object getTypeNullTest() {
- return type_null_test;
- }
-
- /** Sets the value of the 'type_null_test' field */
- public org.apache.flink.api.java.record.io.avro.generated.User.Builder setTypeNullTest(java.lang.Object value) {
- validate(fields()[5], value);
- this.type_null_test = value;
- fieldSetFlags()[5] = true;
- return this;
- }
-
- /** Checks whether the 'type_null_test' field has been set */
- public boolean hasTypeNullTest() {
- return fieldSetFlags()[5];
- }
-
- /** Clears the value of the 'type_null_test' field */
- public org.apache.flink.api.java.record.io.avro.generated.User.Builder clearTypeNullTest() {
- type_null_test = null;
- fieldSetFlags()[5] = false;
- return this;
- }
-
- /** Gets the value of the 'type_bool_test' field */
- public java.lang.Object getTypeBoolTest() {
- return type_bool_test;
- }
-
- /** Sets the value of the 'type_bool_test' field */
- public org.apache.flink.api.java.record.io.avro.generated.User.Builder setTypeBoolTest(java.lang.Object value) {
- validate(fields()[6], value);
- this.type_bool_test = value;
- fieldSetFlags()[6] = true;
- return this;
- }
-
- /** Checks whether the 'type_bool_test' field has been set */
- public boolean hasTypeBoolTest() {
- return fieldSetFlags()[6];
- }
-
- /** Clears the value of the 'type_bool_test' field */
- public org.apache.flink.api.java.record.io.avro.generated.User.Builder clearTypeBoolTest() {
- type_bool_test = null;
- fieldSetFlags()[6] = false;
- return this;
- }
-
- /** Gets the value of the 'type_array_string' field */
- public java.util.List<java.lang.CharSequence> getTypeArrayString() {
- return type_array_string;
- }
-
- /** Sets the value of the 'type_array_string' field */
- public org.apache.flink.api.java.record.io.avro.generated.User.Builder setTypeArrayString(java.util.List<java.lang.CharSequence> value) {
- validate(fields()[7], value);
- this.type_array_string = value;
- fieldSetFlags()[7] = true;
- return this;
- }
-
- /** Checks whether the 'type_array_string' field has been set */
- public boolean hasTypeArrayString() {
- return fieldSetFlags()[7];
- }
-
- /** Clears the value of the 'type_array_string' field */
- public org.apache.flink.api.java.record.io.avro.generated.User.Builder clearTypeArrayString() {
- type_array_string = null;
- fieldSetFlags()[7] = false;
- return this;
- }
-
- /** Gets the value of the 'type_array_boolean' field */
- public java.util.List<java.lang.Boolean> getTypeArrayBoolean() {
- return type_array_boolean;
- }
-
- /** Sets the value of the 'type_array_boolean' field */
- public org.apache.flink.api.java.record.io.avro.generated.User.Builder setTypeArrayBoolean(java.util.List<java.lang.Boolean> value) {
- validate(fields()[8], value);
- this.type_array_boolean = value;
- fieldSetFlags()[8] = true;
- return this;
- }
-
- /** Checks whether the 'type_array_boolean' field has been set */
- public boolean hasTypeArrayBoolean() {
- return fieldSetFlags()[8];
- }
-
- /** Clears the value of the 'type_array_boolean' field */
- public org.apache.flink.api.java.record.io.avro.generated.User.Builder clearTypeArrayBoolean() {
- type_array_boolean = null;
- fieldSetFlags()[8] = false;
- return this;
- }
-
- /** Gets the value of the 'type_nullable_array' field */
- public java.util.List<java.lang.CharSequence> getTypeNullableArray() {
- return type_nullable_array;
- }
-
- /** Sets the value of the 'type_nullable_array' field */
- public org.apache.flink.api.java.record.io.avro.generated.User.Builder setTypeNullableArray(java.util.List<java.lang.CharSequence> value) {
- validate(fields()[9], value);
- this.type_nullable_array = value;
- fieldSetFlags()[9] = true;
- return this;
- }
-
- /** Checks whether the 'type_nullable_array' field has been set */
- public boolean hasTypeNullableArray() {
- return fieldSetFlags()[9];
- }
-
- /** Clears the value of the 'type_nullable_array' field */
- public org.apache.flink.api.java.record.io.avro.generated.User.Builder clearTypeNullableArray() {
- type_nullable_array = null;
- fieldSetFlags()[9] = false;
- return this;
- }
-
- /** Gets the value of the 'type_enum' field */
- public org.apache.flink.api.java.record.io.avro.generated.Colors getTypeEnum() {
- return type_enum;
- }
-
- /** Sets the value of the 'type_enum' field */
- public org.apache.flink.api.java.record.io.avro.generated.User.Builder setTypeEnum(org.apache.flink.api.java.record.io.avro.generated.Colors value) {
- validate(fields()[10], value);
- this.type_enum = value;
- fieldSetFlags()[10] = true;
- return this;
- }
-
- /** Checks whether the 'type_enum' field has been set */
- public boolean hasTypeEnum() {
- return fieldSetFlags()[10];
- }
-
- /** Clears the value of the 'type_enum' field */
- public org.apache.flink.api.java.record.io.avro.generated.User.Builder clearTypeEnum() {
- type_enum = null;
- fieldSetFlags()[10] = false;
- return this;
- }
-
- /** Gets the value of the 'type_map' field */
- public java.util.Map<java.lang.CharSequence,java.lang.Long> getTypeMap() {
- return type_map;
- }
-
- /** Sets the value of the 'type_map' field */
- public org.apache.flink.api.java.record.io.avro.generated.User.Builder setTypeMap(java.util.Map<java.lang.CharSequence,java.lang.Long> value) {
- validate(fields()[11], value);
- this.type_map = value;
- fieldSetFlags()[11] = true;
- return this;
- }
-
- /** Checks whether the 'type_map' field has been set */
- public boolean hasTypeMap() {
- return fieldSetFlags()[11];
- }
-
- /** Clears the value of the 'type_map' field */
- public org.apache.flink.api.java.record.io.avro.generated.User.Builder clearTypeMap() {
- type_map = null;
- fieldSetFlags()[11] = false;
- return this;
- }
-
- @Override
- public User build() {
- try {
- User record = new User();
- record.name = fieldSetFlags()[0] ? this.name : (java.lang.CharSequence) defaultValue(fields()[0]);
- record.favorite_number = fieldSetFlags()[1] ? this.favorite_number : (java.lang.Integer) defaultValue(fields()[1]);
- record.favorite_color = fieldSetFlags()[2] ? this.favorite_color : (java.lang.CharSequence) defaultValue(fields()[2]);
- record.type_long_test = fieldSetFlags()[3] ? this.type_long_test : (java.lang.Long) defaultValue(fields()[3]);
- record.type_double_test = fieldSetFlags()[4] ? this.type_double_test : (java.lang.Object) defaultValue(fields()[4]);
- record.type_null_test = fieldSetFlags()[5] ? this.type_null_test : (java.lang.Object) defaultValue(fields()[5]);
- record.type_bool_test = fieldSetFlags()[6] ? this.type_bool_test : (java.lang.Object) defaultValue(fields()[6]);
- record.type_array_string = fieldSetFlags()[7] ? this.type_array_string : (java.util.List<java.lang.CharSequence>) defaultValue(fields()[7]);
- record.type_array_boolean = fieldSetFlags()[8] ? this.type_array_boolean : (java.util.List<java.lang.Boolean>) defaultValue(fields()[8]);
- record.type_nullable_array = fieldSetFlags()[9] ? this.type_nullable_array : (java.util.List<java.lang.CharSequence>) defaultValue(fields()[9]);
- record.type_enum = fieldSetFlags()[10] ? this.type_enum : (org.apache.flink.api.java.record.io.avro.generated.Colors) defaultValue(fields()[10]);
- record.type_map = fieldSetFlags()[11] ? this.type_map : (java.util.Map<java.lang.CharSequence,java.lang.Long>) defaultValue(fields()[11]);
- return record;
- } catch (Exception e) {
- throw new org.apache.avro.AvroRuntimeException(e);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/avro/src/test/resources/avro/user.avsc
----------------------------------------------------------------------
diff --git a/flink-addons/avro/src/test/resources/avro/user.avsc b/flink-addons/avro/src/test/resources/avro/user.avsc
deleted file mode 100644
index af3cb75..0000000
--- a/flink-addons/avro/src/test/resources/avro/user.avsc
+++ /dev/null
@@ -1,19 +0,0 @@
-
-{"namespace": "org.apache.flink.api.java.record.io.avro.generated",
- "type": "record",
- "name": "User",
- "fields": [
- {"name": "name", "type": "string"},
- {"name": "favorite_number", "type": ["int", "null"]},
- {"name": "favorite_color", "type": ["string", "null"]},
- {"name": "type_long_test", "type": ["long", "null"]},
- {"name": "type_double_test", "type": ["double"]},
- {"name": "type_null_test", "type": ["null"]},
- {"name": "type_bool_test", "type": ["boolean"]},
- {"name": "type_array_string", "type" : {"type" : "array", "items" : "string"}},
- {"name": "type_array_boolean", "type" : {"type" : "array", "items" : "boolean"}},
- {"name": "type_nullable_array", "type": ["null", {"type":"array", "items":"string"}], "default":null},
- {"name": "type_enum", "type": {"type": "enum", "name": "Colors", "symbols" : ["RED", "GREEN", "BLUE"]}},
- {"name": "type_map", "type": {"type": "map", "values": "long"}}
- ]
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/avro/src/test/resources/testdata.avro
----------------------------------------------------------------------
diff --git a/flink-addons/avro/src/test/resources/testdata.avro b/flink-addons/avro/src/test/resources/testdata.avro
deleted file mode 100644
index 45308b9..0000000
Binary files a/flink-addons/avro/src/test/resources/testdata.avro and /dev/null differ