You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2018/07/03 15:17:33 UTC
[2/3] flink git commit: [FLINK-9444] [formats] Add full SQL support
for Avro formats
http://git-wip-us.apache.org/repos/asf/flink/blob/c34c7e41/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRecordInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRecordInputFormatTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRecordInputFormatTest.java
index 92d2c31..84849a6 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRecordInputFormatTest.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRecordInputFormatTest.java
@@ -31,6 +31,7 @@ import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.formats.avro.generated.Address;
import org.apache.flink.formats.avro.generated.Colors;
+import org.apache.flink.formats.avro.generated.Fixed2;
import org.apache.flink.formats.avro.generated.User;
import org.apache.flink.formats.avro.typeutils.AvroTypeInfo;
import org.apache.flink.formats.avro.utils.AvroKryoSerializerUtils;
@@ -47,6 +48,9 @@ import org.apache.avro.io.DatumWriter;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.avro.util.Utf8;
+import org.joda.time.DateTime;
+import org.joda.time.LocalDate;
+import org.joda.time.LocalTime;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -55,6 +59,8 @@ import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@@ -100,15 +106,15 @@ public class AvroRecordInputFormatTest {
private Schema userSchema = new User().getSchema();
public static void writeTestFile(File testFile) throws IOException {
- ArrayList<CharSequence> stringArray = new ArrayList<CharSequence>();
+ ArrayList<CharSequence> stringArray = new ArrayList<>();
stringArray.add(TEST_ARRAY_STRING_1);
stringArray.add(TEST_ARRAY_STRING_2);
- ArrayList<Boolean> booleanArray = new ArrayList<Boolean>();
+ ArrayList<Boolean> booleanArray = new ArrayList<>();
booleanArray.add(TEST_ARRAY_BOOLEAN_1);
booleanArray.add(TEST_ARRAY_BOOLEAN_2);
- HashMap<CharSequence, Long> longMap = new HashMap<CharSequence, Long>();
+ HashMap<CharSequence, Long> longMap = new HashMap<>();
longMap.put(TEST_MAP_KEY1, TEST_MAP_VALUE1);
longMap.put(TEST_MAP_KEY2, TEST_MAP_VALUE2);
@@ -130,6 +136,16 @@ public class AvroRecordInputFormatTest {
user1.setTypeEnum(TEST_ENUM_COLOR);
user1.setTypeMap(longMap);
user1.setTypeNested(addr);
+ user1.setTypeBytes(ByteBuffer.allocate(10));
+ user1.setTypeDate(LocalDate.parse("2014-03-01"));
+ user1.setTypeTimeMillis(LocalTime.parse("12:12:12"));
+ user1.setTypeTimeMicros(123456);
+ user1.setTypeTimestampMillis(DateTime.parse("2014-03-01T12:12:12.321Z"));
+ user1.setTypeTimestampMicros(123456L);
+ // 20.00
+ user1.setTypeDecimalBytes(ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()));
+ // 20.00
+ user1.setTypeDecimalFixed(new Fixed2(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()));
// Construct via builder
User user2 = User.newBuilder()
@@ -140,20 +156,30 @@ public class AvroRecordInputFormatTest {
.setTypeDoubleTest(1.337d)
.setTypeNullTest(null)
.setTypeLongTest(1337L)
- .setTypeArrayString(new ArrayList<CharSequence>())
- .setTypeArrayBoolean(new ArrayList<Boolean>())
+ .setTypeArrayString(new ArrayList<>())
+ .setTypeArrayBoolean(new ArrayList<>())
.setTypeNullableArray(null)
.setTypeEnum(Colors.RED)
- .setTypeMap(new HashMap<CharSequence, Long>())
+ .setTypeMap(new HashMap<>())
.setTypeFixed(null)
.setTypeUnion(null)
.setTypeNested(
Address.newBuilder().setNum(TEST_NUM).setStreet(TEST_STREET)
.setCity(TEST_CITY).setState(TEST_STATE).setZip(TEST_ZIP)
.build())
+ .setTypeBytes(ByteBuffer.allocate(10))
+ .setTypeDate(LocalDate.parse("2014-03-01"))
+ .setTypeTimeMillis(LocalTime.parse("12:12:12"))
+ .setTypeTimeMicros(123456)
+ .setTypeTimestampMillis(DateTime.parse("2014-03-01T12:12:12.321Z"))
+ .setTypeTimestampMicros(123456L)
+ // 20.00
+ .setTypeDecimalBytes(ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()))
+ // 20.00
+ .setTypeDecimalFixed(new Fixed2(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()))
.build();
- DatumWriter<User> userDatumWriter = new SpecificDatumWriter<User>(User.class);
- DataFileWriter<User> dataFileWriter = new DataFileWriter<User>(userDatumWriter);
+ DatumWriter<User> userDatumWriter = new SpecificDatumWriter<>(User.class);
+ DataFileWriter<User> dataFileWriter = new DataFileWriter<>(userDatumWriter);
dataFileWriter.create(user1.getSchema(), testFile);
dataFileWriter.append(user1);
dataFileWriter.append(user2);
@@ -167,14 +193,13 @@ public class AvroRecordInputFormatTest {
}
/**
- * Test if the AvroInputFormat is able to properly read data from an avro file.
- * @throws IOException
+ * Test if the AvroInputFormat is able to properly read data from an Avro file.
*/
@Test
- public void testDeserialisation() throws IOException {
+ public void testDeserialization() throws IOException {
Configuration parameters = new Configuration();
- AvroInputFormat<User> format = new AvroInputFormat<User>(new Path(testFile.getAbsolutePath()), User.class);
+ AvroInputFormat<User> format = new AvroInputFormat<>(new Path(testFile.getAbsolutePath()), User.class);
format.configure(parameters);
FileInputSplit[] splits = format.createInputSplits(1);
@@ -216,14 +241,13 @@ public class AvroRecordInputFormatTest {
}
/**
- * Test if the AvroInputFormat is able to properly read data from an avro file.
- * @throws IOException
+ * Test if the AvroInputFormat is able to properly read data from an Avro file.
*/
@Test
- public void testDeserialisationReuseAvroRecordFalse() throws IOException {
+ public void testDeserializationReuseAvroRecordFalse() throws IOException {
Configuration parameters = new Configuration();
- AvroInputFormat<User> format = new AvroInputFormat<User>(new Path(testFile.getAbsolutePath()), User.class);
+ AvroInputFormat<User> format = new AvroInputFormat<>(new Path(testFile.getAbsolutePath()), User.class);
format.setReuseAvroValue(false);
format.configure(parameters);
@@ -294,7 +318,7 @@ public class AvroRecordInputFormatTest {
ExecutionConfig ec = new ExecutionConfig();
assertEquals(GenericTypeInfo.class, te.getClass());
- Serializers.recursivelyRegisterType(te.getTypeClass(), ec, new HashSet<Class<?>>());
+ Serializers.recursivelyRegisterType(te.getTypeClass(), ec, new HashSet<>());
TypeSerializer<GenericData.Record> tser = te.createSerializer(ec);
assertEquals(1, ec.getDefaultKryoSerializerClasses().size());
@@ -327,7 +351,7 @@ public class AvroRecordInputFormatTest {
@Test
public void testDeserializeToSpecificType() throws IOException {
- DatumReader<User> datumReader = new SpecificDatumReader<User>(userSchema);
+ DatumReader<User> datumReader = new SpecificDatumReader<>(userSchema);
try (FileReader<User> dataFileReader = DataFileReader.openReader(testFile, datumReader)) {
User rec = dataFileReader.next();
@@ -365,15 +389,12 @@ public class AvroRecordInputFormatTest {
/**
* Test if the AvroInputFormat is able to properly read data from an Avro
* file as a GenericRecord.
- *
- * @throws IOException
*/
@Test
- public void testDeserialisationGenericRecord() throws IOException {
+ public void testDeserializationGenericRecord() throws IOException {
Configuration parameters = new Configuration();
- AvroInputFormat<GenericRecord> format = new AvroInputFormat<GenericRecord>(new Path(testFile.getAbsolutePath()),
- GenericRecord.class);
+ AvroInputFormat<GenericRecord> format = new AvroInputFormat<>(new Path(testFile.getAbsolutePath()), GenericRecord.class);
doTestDeserializationGenericRecord(format, parameters);
}
@@ -440,17 +461,17 @@ public class AvroRecordInputFormatTest {
* @throws IOException if there is an error
*/
@Test
- public void testDeserialisationGenericRecordReuseAvroValueFalse() throws IOException {
+ public void testDeserializationGenericRecordReuseAvroValueFalse() throws IOException {
Configuration parameters = new Configuration();
- AvroInputFormat<GenericRecord> format = new AvroInputFormat<GenericRecord>(new Path(testFile.getAbsolutePath()),
- GenericRecord.class);
+ AvroInputFormat<GenericRecord> format = new AvroInputFormat<>(new Path(testFile.getAbsolutePath()), GenericRecord.class);
format.configure(parameters);
format.setReuseAvroValue(false);
doTestDeserializationGenericRecord(format, parameters);
}
+ @SuppressWarnings("ResultOfMethodCallIgnored")
@After
public void deleteFiles() {
testFile.delete();
http://git-wip-us.apache.org/repos/asf/flink/blob/c34c7e41/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDeSerializationSchemaTest.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDeSerializationSchemaTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDeSerializationSchemaTest.java
index 1d98c14..de50f27 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDeSerializationSchemaTest.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDeSerializationSchemaTest.java
@@ -23,8 +23,9 @@ import org.apache.flink.formats.avro.utils.AvroTestUtils;
import org.apache.flink.types.Row;
import org.apache.flink.util.InstantiationUtil;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
import org.apache.avro.specific.SpecificRecord;
-import org.apache.avro.specific.SpecificRecordBase;
import org.junit.Test;
import java.io.IOException;
@@ -37,8 +38,8 @@ import static org.junit.Assert.assertEquals;
public class AvroRowDeSerializationSchemaTest {
@Test
- public void testSerializeDeserializeSimpleRow() throws IOException {
- final Tuple3<Class<? extends SpecificRecordBase>, SpecificRecord, Row> testData = AvroTestUtils.getSimpleTestData();
+ public void testSpecificSerializeDeserializeFromClass() throws IOException {
+ final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> testData = AvroTestUtils.getSpecificTestData();
final AvroRowSerializationSchema serializationSchema = new AvroRowSerializationSchema(testData.f0);
final AvroRowDeserializationSchema deserializationSchema = new AvroRowDeserializationSchema(testData.f0);
@@ -50,14 +51,13 @@ public class AvroRowDeSerializationSchemaTest {
}
@Test
- public void testSerializeSimpleRowSeveralTimes() throws IOException {
- final Tuple3<Class<? extends SpecificRecordBase>, SpecificRecord, Row> testData = AvroTestUtils.getSimpleTestData();
+ public void testSpecificSerializeDeserializeFromSchema() throws IOException {
+ final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> testData = AvroTestUtils.getSpecificTestData();
+ final String schemaString = testData.f1.getSchema().toString();
- final AvroRowSerializationSchema serializationSchema = new AvroRowSerializationSchema(testData.f0);
- final AvroRowDeserializationSchema deserializationSchema = new AvroRowDeserializationSchema(testData.f0);
+ final AvroRowSerializationSchema serializationSchema = new AvroRowSerializationSchema(schemaString);
+ final AvroRowDeserializationSchema deserializationSchema = new AvroRowDeserializationSchema(schemaString);
- serializationSchema.serialize(testData.f2);
- serializationSchema.serialize(testData.f2);
final byte[] bytes = serializationSchema.serialize(testData.f2);
final Row actual = deserializationSchema.deserialize(bytes);
@@ -65,27 +65,27 @@ public class AvroRowDeSerializationSchemaTest {
}
@Test
- public void testDeserializeRowSeveralTimes() throws IOException {
- final Tuple3<Class<? extends SpecificRecordBase>, SpecificRecord, Row> testData = AvroTestUtils.getSimpleTestData();
+ public void testGenericSerializeDeserialize() throws IOException {
+ final Tuple3<GenericRecord, Row, Schema> testData = AvroTestUtils.getGenericTestData();
- final AvroRowSerializationSchema serializationSchema = new AvroRowSerializationSchema(testData.f0);
- final AvroRowDeserializationSchema deserializationSchema = new AvroRowDeserializationSchema(testData.f0);
+ final AvroRowSerializationSchema serializationSchema = new AvroRowSerializationSchema(testData.f2.toString());
+ final AvroRowDeserializationSchema deserializationSchema = new AvroRowDeserializationSchema(testData.f2.toString());
- final byte[] bytes = serializationSchema.serialize(testData.f2);
- deserializationSchema.deserialize(bytes);
- deserializationSchema.deserialize(bytes);
+ final byte[] bytes = serializationSchema.serialize(testData.f1);
final Row actual = deserializationSchema.deserialize(bytes);
- assertEquals(testData.f2, actual);
+ assertEquals(testData.f1, actual);
}
@Test
- public void testSerializeDeserializeComplexRow() throws IOException {
- final Tuple3<Class<? extends SpecificRecordBase>, SpecificRecord, Row> testData = AvroTestUtils.getComplexTestData();
+ public void testSpecificSerializeFromClassSeveralTimes() throws IOException {
+ final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> testData = AvroTestUtils.getSpecificTestData();
final AvroRowSerializationSchema serializationSchema = new AvroRowSerializationSchema(testData.f0);
final AvroRowDeserializationSchema deserializationSchema = new AvroRowDeserializationSchema(testData.f0);
+ serializationSchema.serialize(testData.f2);
+ serializationSchema.serialize(testData.f2);
final byte[] bytes = serializationSchema.serialize(testData.f2);
final Row actual = deserializationSchema.deserialize(bytes);
@@ -93,11 +93,12 @@ public class AvroRowDeSerializationSchemaTest {
}
@Test
- public void testSerializeComplexRowSeveralTimes() throws IOException {
- final Tuple3<Class<? extends SpecificRecordBase>, SpecificRecord, Row> testData = AvroTestUtils.getComplexTestData();
+ public void testSpecificSerializeFromSchemaSeveralTimes() throws IOException {
+ final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> testData = AvroTestUtils.getSpecificTestData();
+ final String schemaString = testData.f1.getSchema().toString();
- final AvroRowSerializationSchema serializationSchema = new AvroRowSerializationSchema(testData.f0);
- final AvroRowDeserializationSchema deserializationSchema = new AvroRowDeserializationSchema(testData.f0);
+ final AvroRowSerializationSchema serializationSchema = new AvroRowSerializationSchema(schemaString);
+ final AvroRowDeserializationSchema deserializationSchema = new AvroRowDeserializationSchema(schemaString);
serializationSchema.serialize(testData.f2);
serializationSchema.serialize(testData.f2);
@@ -108,8 +109,23 @@ public class AvroRowDeSerializationSchemaTest {
}
@Test
- public void testDeserializeComplexRowSeveralTimes() throws IOException {
- final Tuple3<Class<? extends SpecificRecordBase>, SpecificRecord, Row> testData = AvroTestUtils.getComplexTestData();
+ public void testGenericSerializeSeveralTimes() throws IOException {
+ final Tuple3<GenericRecord, Row, Schema> testData = AvroTestUtils.getGenericTestData();
+
+ final AvroRowSerializationSchema serializationSchema = new AvroRowSerializationSchema(testData.f2.toString());
+ final AvroRowDeserializationSchema deserializationSchema = new AvroRowDeserializationSchema(testData.f2.toString());
+
+ serializationSchema.serialize(testData.f1);
+ serializationSchema.serialize(testData.f1);
+ final byte[] bytes = serializationSchema.serialize(testData.f1);
+ final Row actual = deserializationSchema.deserialize(bytes);
+
+ assertEquals(testData.f1, actual);
+ }
+
+ @Test
+ public void testSpecificDeserializeFromClassSeveralTimes() throws IOException {
+ final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> testData = AvroTestUtils.getSpecificTestData();
final AvroRowSerializationSchema serializationSchema = new AvroRowSerializationSchema(testData.f0);
final AvroRowDeserializationSchema deserializationSchema = new AvroRowDeserializationSchema(testData.f0);
@@ -123,25 +139,66 @@ public class AvroRowDeSerializationSchemaTest {
}
@Test
- public void testSerializability() throws IOException, ClassNotFoundException {
- final Tuple3<Class<? extends SpecificRecordBase>, SpecificRecord, Row> testData = AvroTestUtils.getComplexTestData();
+ public void testSpecificDeserializeFromSchemaSeveralTimes() throws IOException {
+ final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> testData = AvroTestUtils.getSpecificTestData();
+ final String schemaString = testData.f1.getSchema().toString();
- final AvroRowSerializationSchema serOrig = new AvroRowSerializationSchema(testData.f0);
- final AvroRowDeserializationSchema deserOrig = new AvroRowDeserializationSchema(testData.f0);
+ final AvroRowSerializationSchema serializationSchema = new AvroRowSerializationSchema(schemaString);
+ final AvroRowDeserializationSchema deserializationSchema = new AvroRowDeserializationSchema(schemaString);
- byte[] serBytes = InstantiationUtil.serializeObject(serOrig);
- byte[] deserBytes = InstantiationUtil.serializeObject(deserOrig);
+ final byte[] bytes = serializationSchema.serialize(testData.f2);
+ deserializationSchema.deserialize(bytes);
+ deserializationSchema.deserialize(bytes);
+ final Row actual = deserializationSchema.deserialize(bytes);
- AvroRowSerializationSchema serCopy =
+ assertEquals(testData.f2, actual);
+ }
+
+ @Test
+ public void testGenericDeserializeSeveralTimes() throws IOException {
+ final Tuple3<GenericRecord, Row, Schema> testData = AvroTestUtils.getGenericTestData();
+
+ final AvroRowSerializationSchema serializationSchema = new AvroRowSerializationSchema(testData.f2.toString());
+ final AvroRowDeserializationSchema deserializationSchema = new AvroRowDeserializationSchema(testData.f2.toString());
+
+ final byte[] bytes = serializationSchema.serialize(testData.f1);
+ deserializationSchema.deserialize(bytes);
+ deserializationSchema.deserialize(bytes);
+ final Row actual = deserializationSchema.deserialize(bytes);
+
+ assertEquals(testData.f1, actual);
+ }
+
+ @Test
+ public void testSerializability() throws Exception {
+ final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> testData = AvroTestUtils.getSpecificTestData();
+ final String schemaString = testData.f1.getSchema().toString();
+
+ // from class
+ final AvroRowSerializationSchema classSer = new AvroRowSerializationSchema(testData.f0);
+ final AvroRowDeserializationSchema classDeser = new AvroRowDeserializationSchema(testData.f0);
+ testSerializability(classSer, classDeser, testData.f2);
+
+ // from schema string
+ final AvroRowSerializationSchema schemaSer = new AvroRowSerializationSchema(schemaString);
+ final AvroRowDeserializationSchema schemaDeser = new AvroRowDeserializationSchema(schemaString);
+ testSerializability(schemaSer, schemaDeser, testData.f2);
+ }
+
+ private void testSerializability(AvroRowSerializationSchema ser, AvroRowDeserializationSchema deser, Row data) throws Exception {
+ final byte[] serBytes = InstantiationUtil.serializeObject(ser);
+ final byte[] deserBytes = InstantiationUtil.serializeObject(deser);
+
+ final AvroRowSerializationSchema serCopy =
InstantiationUtil.deserializeObject(serBytes, Thread.currentThread().getContextClassLoader());
- AvroRowDeserializationSchema deserCopy =
+ final AvroRowDeserializationSchema deserCopy =
InstantiationUtil.deserializeObject(deserBytes, Thread.currentThread().getContextClassLoader());
- final byte[] bytes = serCopy.serialize(testData.f2);
+ final byte[] bytes = serCopy.serialize(data);
deserCopy.deserialize(bytes);
deserCopy.deserialize(bytes);
final Row actual = deserCopy.deserialize(bytes);
- assertEquals(testData.f2, actual);
+ assertEquals(data, actual);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c34c7e41/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroSplittableInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroSplittableInputFormatTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroSplittableInputFormatTest.java
index 40a84f9..fee81a8 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroSplittableInputFormatTest.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroSplittableInputFormatTest.java
@@ -25,11 +25,15 @@ import org.apache.flink.core.fs.Path;
import org.apache.flink.formats.avro.generated.Address;
import org.apache.flink.formats.avro.generated.Colors;
import org.apache.flink.formats.avro.generated.Fixed16;
+import org.apache.flink.formats.avro.generated.Fixed2;
import org.apache.flink.formats.avro.generated.User;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.specific.SpecificDatumWriter;
+import org.joda.time.DateTime;
+import org.joda.time.LocalDate;
+import org.joda.time.LocalTime;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -37,6 +41,8 @@ import org.junit.Test;
import java.io.File;
import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Random;
@@ -67,7 +73,7 @@ public class AvroSplittableInputFormatTest {
static final String TEST_MAP_KEY2 = "KEY 2";
static final long TEST_MAP_VALUE2 = 17554L;
- static final Integer TEST_NUM = new Integer(239);
+ static final Integer TEST_NUM = 239;
static final String TEST_STREET = "Baker Street";
static final String TEST_CITY = "London";
static final String TEST_STATE = "London";
@@ -79,20 +85,20 @@ public class AvroSplittableInputFormatTest {
public void createFiles() throws IOException {
testFile = File.createTempFile("AvroSplittableInputFormatTest", null);
- ArrayList<CharSequence> stringArray = new ArrayList<CharSequence>();
+ ArrayList<CharSequence> stringArray = new ArrayList<>();
stringArray.add(TEST_ARRAY_STRING_1);
stringArray.add(TEST_ARRAY_STRING_2);
- ArrayList<Boolean> booleanArray = new ArrayList<Boolean>();
+ ArrayList<Boolean> booleanArray = new ArrayList<>();
booleanArray.add(TEST_ARRAY_BOOLEAN_1);
booleanArray.add(TEST_ARRAY_BOOLEAN_2);
- HashMap<CharSequence, Long> longMap = new HashMap<CharSequence, Long>();
+ HashMap<CharSequence, Long> longMap = new HashMap<>();
longMap.put(TEST_MAP_KEY1, TEST_MAP_VALUE1);
longMap.put(TEST_MAP_KEY2, TEST_MAP_VALUE2);
Address addr = new Address();
- addr.setNum(new Integer(TEST_NUM));
+ addr.setNum(TEST_NUM);
addr.setStreet(TEST_STREET);
addr.setCity(TEST_CITY);
addr.setState(TEST_STATE);
@@ -108,6 +114,16 @@ public class AvroSplittableInputFormatTest {
user1.setTypeEnum(TEST_ENUM_COLOR);
user1.setTypeMap(longMap);
user1.setTypeNested(addr);
+ user1.setTypeBytes(ByteBuffer.allocate(10));
+ user1.setTypeDate(LocalDate.parse("2014-03-01"));
+ user1.setTypeTimeMillis(LocalTime.parse("12:12:12"));
+ user1.setTypeTimeMicros(123456);
+ user1.setTypeTimestampMillis(DateTime.parse("2014-03-01T12:12:12.321Z"));
+ user1.setTypeTimestampMicros(123456L);
+ // 20.00
+ user1.setTypeDecimalBytes(ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()));
+ // 20.00
+ user1.setTypeDecimalFixed(new Fixed2(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()));
// Construct via builder
User user2 = User.newBuilder()
@@ -118,20 +134,30 @@ public class AvroSplittableInputFormatTest {
.setTypeDoubleTest(1.337d)
.setTypeNullTest(null)
.setTypeLongTest(1337L)
- .setTypeArrayString(new ArrayList<CharSequence>())
- .setTypeArrayBoolean(new ArrayList<Boolean>())
+ .setTypeArrayString(new ArrayList<>())
+ .setTypeArrayBoolean(new ArrayList<>())
.setTypeNullableArray(null)
.setTypeEnum(Colors.RED)
- .setTypeMap(new HashMap<CharSequence, Long>())
+ .setTypeMap(new HashMap<>())
.setTypeFixed(new Fixed16())
.setTypeUnion(123L)
.setTypeNested(
Address.newBuilder().setNum(TEST_NUM).setStreet(TEST_STREET)
.setCity(TEST_CITY).setState(TEST_STATE).setZip(TEST_ZIP)
.build())
+ .setTypeBytes(ByteBuffer.allocate(10))
+ .setTypeDate(LocalDate.parse("2014-03-01"))
+ .setTypeTimeMillis(LocalTime.parse("12:12:12"))
+ .setTypeTimeMicros(123456)
+ .setTypeTimestampMillis(DateTime.parse("2014-03-01T12:12:12.321Z"))
+ .setTypeTimestampMicros(123456L)
+ // 20.00
+ .setTypeDecimalBytes(ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()))
+ // 20.00
+ .setTypeDecimalFixed(new Fixed2(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()))
.build();
- DatumWriter<User> userDatumWriter = new SpecificDatumWriter<User>(User.class);
- DataFileWriter<User> dataFileWriter = new DataFileWriter<User>(userDatumWriter);
+ DatumWriter<User> userDatumWriter = new SpecificDatumWriter<>(User.class);
+ DataFileWriter<User> dataFileWriter = new DataFileWriter<>(userDatumWriter);
dataFileWriter.create(user1.getSchema(), testFile);
dataFileWriter.append(user1);
dataFileWriter.append(user2);
@@ -148,12 +174,22 @@ public class AvroSplittableInputFormatTest {
user.setTypeEnum(TEST_ENUM_COLOR);
user.setTypeMap(longMap);
Address address = new Address();
- address.setNum(new Integer(TEST_NUM));
+ address.setNum(TEST_NUM);
address.setStreet(TEST_STREET);
address.setCity(TEST_CITY);
address.setState(TEST_STATE);
address.setZip(TEST_ZIP);
user.setTypeNested(address);
+ user.setTypeBytes(ByteBuffer.allocate(10));
+ user.setTypeDate(LocalDate.parse("2014-03-01"));
+ user.setTypeTimeMillis(LocalTime.parse("12:12:12"));
+ user.setTypeTimeMicros(123456);
+ user.setTypeTimestampMillis(DateTime.parse("2014-03-01T12:12:12.321Z"));
+ user.setTypeTimestampMicros(123456L);
+ // 20.00
+ user.setTypeDecimalBytes(ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()));
+ // 20.00
+ user.setTypeDecimalFixed(new Fixed2(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()));
dataFileWriter.append(user);
}
@@ -164,7 +200,7 @@ public class AvroSplittableInputFormatTest {
public void testSplittedIF() throws IOException {
Configuration parameters = new Configuration();
- AvroInputFormat<User> format = new AvroInputFormat<User>(new Path(testFile.getAbsolutePath()), User.class);
+ AvroInputFormat<User> format = new AvroInputFormat<>(new Path(testFile.getAbsolutePath()), User.class);
format.configure(parameters);
FileInputSplit[] splits = format.createInputSplits(4);
@@ -182,10 +218,10 @@ public class AvroSplittableInputFormatTest {
format.close();
}
- Assert.assertEquals(1539, elementsPerSplit[0]);
- Assert.assertEquals(1026, elementsPerSplit[1]);
- Assert.assertEquals(1539, elementsPerSplit[2]);
- Assert.assertEquals(896, elementsPerSplit[3]);
+ Assert.assertEquals(1604, elementsPerSplit[0]);
+ Assert.assertEquals(1203, elementsPerSplit[1]);
+ Assert.assertEquals(1203, elementsPerSplit[2]);
+ Assert.assertEquals(990, elementsPerSplit[3]);
Assert.assertEquals(NUM_RECORDS, elements);
format.close();
}
@@ -196,7 +232,7 @@ public class AvroSplittableInputFormatTest {
Configuration parameters = new Configuration();
- AvroInputFormat<User> format = new AvroInputFormat<User>(new Path(testFile.getAbsolutePath()), User.class);
+ AvroInputFormat<User> format = new AvroInputFormat<>(new Path(testFile.getAbsolutePath()), User.class);
format.configure(parameters);
FileInputSplit[] splits = format.createInputSplits(4);
@@ -228,10 +264,10 @@ public class AvroSplittableInputFormatTest {
format.close();
}
- Assert.assertEquals(1539, elementsPerSplit[0]);
- Assert.assertEquals(1026, elementsPerSplit[1]);
- Assert.assertEquals(1539, elementsPerSplit[2]);
- Assert.assertEquals(896, elementsPerSplit[3]);
+ Assert.assertEquals(1604, elementsPerSplit[0]);
+ Assert.assertEquals(1203, elementsPerSplit[1]);
+ Assert.assertEquals(1203, elementsPerSplit[2]);
+ Assert.assertEquals(990, elementsPerSplit[3]);
Assert.assertEquals(NUM_RECORDS, elements);
format.close();
}
@@ -242,7 +278,7 @@ public class AvroSplittableInputFormatTest {
Configuration parameters = new Configuration();
- AvroInputFormat<User> format = new AvroInputFormat<User>(new Path(testFile.getAbsolutePath()), User.class);
+ AvroInputFormat<User> format = new AvroInputFormat<>(new Path(testFile.getAbsolutePath()), User.class);
format.configure(parameters);
FileInputSplit[] splits = format.createInputSplits(4);
@@ -274,10 +310,10 @@ public class AvroSplittableInputFormatTest {
format.close();
}
- Assert.assertEquals(1539, elementsPerSplit[0]);
- Assert.assertEquals(1026, elementsPerSplit[1]);
- Assert.assertEquals(1539, elementsPerSplit[2]);
- Assert.assertEquals(896, elementsPerSplit[3]);
+ Assert.assertEquals(1604, elementsPerSplit[0]);
+ Assert.assertEquals(1203, elementsPerSplit[1]);
+ Assert.assertEquals(1203, elementsPerSplit[2]);
+ Assert.assertEquals(990, elementsPerSplit[3]);
Assert.assertEquals(NUM_RECORDS, elements);
format.close();
}
@@ -287,11 +323,23 @@ public class AvroSplittableInputFormatTest {
This dependency needs to be added
- <dependency>
- <groupId>org.apache.avro</groupId>
- <artifactId>avro-mapred</artifactId>
- <version>1.7.6</version>
- </dependency>
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro-mapred</artifactId>
+ <version>1.7.6</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-hadoop-compatibility_2.11</artifactId>
+ <version>1.6-SNAPSHOT</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>16.0</version>
+ </dependency>
@Test
public void testHadoop() throws Exception {
@@ -314,10 +362,11 @@ public class AvroSplittableInputFormatTest {
}
i++;
}
- System.out.println("Status "+Arrays.toString(elementsPerSplit));
- } **/
+ System.out.println("Status " + Arrays.toString(elementsPerSplit));
+ } */
@After
+ @SuppressWarnings("ResultOfMethodCallIgnored")
public void deleteFiles() {
testFile.delete();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c34c7e41/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/EncoderDecoderTest.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/EncoderDecoderTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/EncoderDecoderTest.java
index 87e169b..49ef985 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/EncoderDecoderTest.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/EncoderDecoderTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.formats.avro;
import org.apache.flink.formats.avro.generated.Address;
import org.apache.flink.formats.avro.generated.Colors;
import org.apache.flink.formats.avro.generated.Fixed16;
+import org.apache.flink.formats.avro.generated.Fixed2;
import org.apache.flink.formats.avro.generated.User;
import org.apache.flink.formats.avro.utils.DataInputDecoder;
import org.apache.flink.formats.avro.utils.DataOutputEncoder;
@@ -28,12 +29,17 @@ import org.apache.flink.util.StringUtils;
import org.apache.avro.reflect.ReflectDatumReader;
import org.apache.avro.reflect.ReflectDatumWriter;
+import org.joda.time.DateTime;
+import org.joda.time.LocalDate;
+import org.joda.time.LocalTime;
import org.junit.Test;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@@ -49,6 +55,7 @@ import static org.junit.Assert.fail;
* Tests the {@link DataOutputEncoder} and {@link DataInputDecoder} classes for Avro serialization.
*/
public class EncoderDecoderTest {
+
@Test
public void testComplexStringsDirecty() {
try {
@@ -93,56 +100,56 @@ public class EncoderDecoderTest {
@Test
public void testPrimitiveTypes() {
- testObjectSerialization(new Boolean(true));
- testObjectSerialization(new Boolean(false));
-
- testObjectSerialization(Byte.valueOf((byte) 0));
- testObjectSerialization(Byte.valueOf((byte) 1));
- testObjectSerialization(Byte.valueOf((byte) -1));
- testObjectSerialization(Byte.valueOf(Byte.MIN_VALUE));
- testObjectSerialization(Byte.valueOf(Byte.MAX_VALUE));
-
- testObjectSerialization(Short.valueOf((short) 0));
- testObjectSerialization(Short.valueOf((short) 1));
- testObjectSerialization(Short.valueOf((short) -1));
- testObjectSerialization(Short.valueOf(Short.MIN_VALUE));
- testObjectSerialization(Short.valueOf(Short.MAX_VALUE));
-
- testObjectSerialization(Integer.valueOf(0));
- testObjectSerialization(Integer.valueOf(1));
- testObjectSerialization(Integer.valueOf(-1));
- testObjectSerialization(Integer.valueOf(Integer.MIN_VALUE));
- testObjectSerialization(Integer.valueOf(Integer.MAX_VALUE));
-
- testObjectSerialization(Long.valueOf(0));
- testObjectSerialization(Long.valueOf(1));
- testObjectSerialization(Long.valueOf(-1));
- testObjectSerialization(Long.valueOf(Long.MIN_VALUE));
- testObjectSerialization(Long.valueOf(Long.MAX_VALUE));
-
- testObjectSerialization(Float.valueOf(0));
- testObjectSerialization(Float.valueOf(1));
- testObjectSerialization(Float.valueOf(-1));
- testObjectSerialization(Float.valueOf((float) Math.E));
- testObjectSerialization(Float.valueOf((float) Math.PI));
- testObjectSerialization(Float.valueOf(Float.MIN_VALUE));
- testObjectSerialization(Float.valueOf(Float.MAX_VALUE));
- testObjectSerialization(Float.valueOf(Float.MIN_NORMAL));
- testObjectSerialization(Float.valueOf(Float.NaN));
- testObjectSerialization(Float.valueOf(Float.NEGATIVE_INFINITY));
- testObjectSerialization(Float.valueOf(Float.POSITIVE_INFINITY));
-
- testObjectSerialization(Double.valueOf(0));
- testObjectSerialization(Double.valueOf(1));
- testObjectSerialization(Double.valueOf(-1));
- testObjectSerialization(Double.valueOf(Math.E));
- testObjectSerialization(Double.valueOf(Math.PI));
- testObjectSerialization(Double.valueOf(Double.MIN_VALUE));
- testObjectSerialization(Double.valueOf(Double.MAX_VALUE));
- testObjectSerialization(Double.valueOf(Double.MIN_NORMAL));
- testObjectSerialization(Double.valueOf(Double.NaN));
- testObjectSerialization(Double.valueOf(Double.NEGATIVE_INFINITY));
- testObjectSerialization(Double.valueOf(Double.POSITIVE_INFINITY));
+ testObjectSerialization(Boolean.TRUE);
+ testObjectSerialization(Boolean.FALSE);
+
+ testObjectSerialization((byte) 0);
+ testObjectSerialization((byte) 1);
+ testObjectSerialization((byte) -1);
+ testObjectSerialization(Byte.MIN_VALUE);
+ testObjectSerialization(Byte.MAX_VALUE);
+
+ testObjectSerialization((short) 0);
+ testObjectSerialization((short) 1);
+ testObjectSerialization((short) -1);
+ testObjectSerialization(Short.MIN_VALUE);
+ testObjectSerialization(Short.MAX_VALUE);
+
+ testObjectSerialization(0);
+ testObjectSerialization(1);
+ testObjectSerialization(-1);
+ testObjectSerialization(Integer.MIN_VALUE);
+ testObjectSerialization(Integer.MAX_VALUE);
+
+ testObjectSerialization(0L);
+ testObjectSerialization(1L);
+ testObjectSerialization((long) -1);
+ testObjectSerialization(Long.MIN_VALUE);
+ testObjectSerialization(Long.MAX_VALUE);
+
+ testObjectSerialization(0f);
+ testObjectSerialization(1f);
+ testObjectSerialization((float) -1);
+ testObjectSerialization((float) Math.E);
+ testObjectSerialization((float) Math.PI);
+ testObjectSerialization(Float.MIN_VALUE);
+ testObjectSerialization(Float.MAX_VALUE);
+ testObjectSerialization(Float.MIN_NORMAL);
+ testObjectSerialization(Float.NaN);
+ testObjectSerialization(Float.NEGATIVE_INFINITY);
+ testObjectSerialization(Float.POSITIVE_INFINITY);
+
+ testObjectSerialization(0d);
+ testObjectSerialization(1d);
+ testObjectSerialization((double) -1);
+ testObjectSerialization(Math.E);
+ testObjectSerialization(Math.PI);
+ testObjectSerialization(Double.MIN_VALUE);
+ testObjectSerialization(Double.MAX_VALUE);
+ testObjectSerialization(Double.MIN_NORMAL);
+ testObjectSerialization(Double.NaN);
+ testObjectSerialization(Double.NEGATIVE_INFINITY);
+ testObjectSerialization(Double.POSITIVE_INFINITY);
testObjectSerialization("");
testObjectSerialization("abcdefg");
@@ -209,7 +216,7 @@ public class EncoderDecoderTest {
// object with collection
{
- ArrayList<String> list = new ArrayList<String>();
+ ArrayList<String> list = new ArrayList<>();
list.add("A");
list.add("B");
list.add("C");
@@ -221,7 +228,7 @@ public class EncoderDecoderTest {
// object with empty collection
{
- ArrayList<String> list = new ArrayList<String>();
+ ArrayList<String> list = new ArrayList<>();
testObjectSerialization(new BookAuthor(987654321L, list, "The Saurus"));
}
}
@@ -235,7 +242,7 @@ public class EncoderDecoderTest {
public void testGeneratedObjectWithNullableFields() {
List<CharSequence> strings = Arrays.asList(new CharSequence[] { "These", "strings", "should", "be", "recognizable", "as", "a", "meaningful", "sequence" });
List<Boolean> bools = Arrays.asList(true, true, false, false, true, false, true, true);
- Map<CharSequence, Long> map = new HashMap<CharSequence, Long>();
+ Map<CharSequence, Long> map = new HashMap<>();
map.put("1", 1L);
map.put("2", 2L);
map.put("3", 3L);
@@ -243,11 +250,31 @@ public class EncoderDecoderTest {
byte[] b = new byte[16];
new Random().nextBytes(b);
Fixed16 f = new Fixed16(b);
- Address addr = new Address(new Integer(239), "6th Main", "Bangalore",
- "Karnataka", "560075");
- User user = new User("Freudenreich", 1337, "macintosh gray",
- 1234567890L, 3.1415926, null, true, strings, bools, null,
- Colors.GREEN, map, f, new Boolean(true), addr);
+ Address addr = new Address(239, "6th Main", "Bangalore", "Karnataka", "560075");
+ User user = new User(
+ "Freudenreich",
+ 1337,
+ "macintosh gray",
+ 1234567890L,
+ 3.1415926,
+ null,
+ true,
+ strings,
+ bools,
+ null,
+ Colors.GREEN,
+ map,
+ f,
+ Boolean.TRUE,
+ addr,
+ ByteBuffer.wrap(b),
+ LocalDate.parse("2014-03-01"),
+ LocalTime.parse("12:12:12"),
+ 123456,
+ DateTime.parse("2014-03-01T12:12:12.321Z"),
+ 123456L,
+ ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()), // 20.00
+ new Fixed2(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray())); // 20.00
testObjectSerialization(user);
}
@@ -301,7 +328,7 @@ public class EncoderDecoderTest {
@SuppressWarnings("unchecked")
Class<X> clazz = (Class<X>) obj.getClass();
- ReflectDatumWriter<X> writer = new ReflectDatumWriter<X>(clazz);
+ ReflectDatumWriter<X> writer = new ReflectDatumWriter<>(clazz);
writer.write(obj, encoder);
dataOut.flush();
@@ -309,7 +336,7 @@ public class EncoderDecoderTest {
}
byte[] data = baos.toByteArray();
- X result = null;
+ X result;
// deserialize
{
@@ -320,7 +347,7 @@ public class EncoderDecoderTest {
@SuppressWarnings("unchecked")
Class<X> clazz = (Class<X>) obj.getClass();
- ReflectDatumReader<X> reader = new ReflectDatumReader<X>(clazz);
+ ReflectDatumReader<X> reader = new ReflectDatumReader<>(clazz);
// create a reuse object if possible, otherwise we have no reuse object
X reuse = null;
@@ -328,7 +355,9 @@ public class EncoderDecoderTest {
@SuppressWarnings("unchecked")
X test = (X) obj.getClass().newInstance();
reuse = test;
- } catch (Throwable t) {}
+ } catch (Throwable t) {
+ // do nothing
+ }
result = reader.read(reuse, decoder);
}
@@ -427,7 +456,7 @@ public class EncoderDecoderTest {
public ComplexNestedObject1(int offInit) {
this.doubleValue = 6293485.6723 + offInit;
- this.stringList = new ArrayList<String>();
+ this.stringList = new ArrayList<>();
this.stringList.add("A" + offInit);
this.stringList.add("somewhat" + offInit);
this.stringList.add("random" + offInit);
@@ -458,7 +487,7 @@ public class EncoderDecoderTest {
public ComplexNestedObject2(boolean init) {
this.longValue = 46547;
- this.theMap = new HashMap<String, ComplexNestedObject1>();
+ this.theMap = new HashMap<>();
this.theMap.put("36354L", new ComplexNestedObject1(43546543));
this.theMap.put("785611L", new ComplexNestedObject1(45784568));
this.theMap.put("43L", new ComplexNestedObject1(9876543));
http://git-wip-us.apache.org/repos/asf/flink/blob/c34c7e41/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterTest.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterTest.java
new file mode 100644
index 0000000..be0ddc4
--- /dev/null
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.typeutils;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.formats.avro.generated.User;
+import org.apache.flink.types.Row;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link AvroSchemaConverter}.
+ */
+public class AvroSchemaConverterTest {
+
+ @Test
+ public void testAvroClassConversion() {
+ validateUserSchema(AvroSchemaConverter.convertToTypeInfo(User.class));
+ }
+
+ @Test
+ public void testAvroSchemaConversion() {
+ final String schema = User.getClassSchema().toString(true);
+ validateUserSchema(AvroSchemaConverter.convertToTypeInfo(schema));
+ }
+
+ private void validateUserSchema(TypeInformation<?> actual) {
+ final TypeInformation<Row> address = Types.ROW_NAMED(
+ new String[]{
+ "num",
+ "street",
+ "city",
+ "state",
+ "zip"},
+ Types.INT,
+ Types.STRING,
+ Types.STRING,
+ Types.STRING,
+ Types.STRING);
+
+ final TypeInformation<Row> user = Types.ROW_NAMED(
+ new String[] {
+ "name",
+ "favorite_number",
+ "favorite_color",
+ "type_long_test",
+ "type_double_test",
+ "type_null_test",
+ "type_bool_test",
+ "type_array_string",
+ "type_array_boolean",
+ "type_nullable_array",
+ "type_enum",
+ "type_map",
+ "type_fixed",
+ "type_union",
+ "type_nested",
+ "type_bytes",
+ "type_date",
+ "type_time_millis",
+ "type_time_micros",
+ "type_timestamp_millis",
+ "type_timestamp_micros",
+ "type_decimal_bytes",
+ "type_decimal_fixed"},
+ Types.STRING,
+ Types.INT,
+ Types.STRING,
+ Types.LONG,
+ Types.DOUBLE,
+ Types.VOID,
+ Types.BOOLEAN,
+ Types.OBJECT_ARRAY(Types.STRING),
+ Types.OBJECT_ARRAY(Types.BOOLEAN),
+ Types.OBJECT_ARRAY(Types.STRING),
+ Types.STRING,
+ Types.MAP(Types.STRING, Types.LONG),
+ Types.PRIMITIVE_ARRAY(Types.BYTE),
+ Types.GENERIC(Object.class),
+ address,
+ Types.PRIMITIVE_ARRAY(Types.BYTE),
+ Types.SQL_DATE,
+ Types.SQL_TIME,
+ Types.INT,
+ Types.SQL_TIMESTAMP,
+ Types.LONG,
+ Types.BIG_DEC,
+ Types.BIG_DEC);
+
+ assertEquals(user, actual);
+
+ final RowTypeInfo userRowInfo = (RowTypeInfo) user;
+ assertTrue(userRowInfo.schemaEquals(actual));
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c34c7e41/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeExtractionTest.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeExtractionTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeExtractionTest.java
index fbabb95..ccba0a5 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeExtractionTest.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeExtractionTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.formats.avro.typeutils;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.functions.KeySelector;
@@ -31,7 +32,6 @@ import org.apache.flink.formats.avro.AvroRecordInputFormatTest;
import org.apache.flink.formats.avro.generated.Fixed16;
import org.apache.flink.formats.avro.generated.User;
import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.apache.flink.util.Collector;
import org.junit.After;
import org.junit.Assert;
@@ -52,6 +52,7 @@ import java.util.Map;
*/
@RunWith(Parameterized.class)
public class AvroTypeExtractionTest extends MultipleProgramsTestBase {
+
public AvroTypeExtractionTest(TestExecutionMode mode) {
super(mode);
}
@@ -80,7 +81,7 @@ public class AvroTypeExtractionTest extends MultipleProgramsTestBase {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Path in = new Path(inFile.getAbsoluteFile().toURI());
- AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class);
+ AvroInputFormat<User> users = new AvroInputFormat<>(in, User.class);
DataSet<User> usersDS = env.createInput(users)
.map((value) -> value);
@@ -88,8 +89,19 @@ public class AvroTypeExtractionTest extends MultipleProgramsTestBase {
env.execute("Simple Avro read job");
- expected = "{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null, \"type_long_test\": null, \"type_double_test\": 123.45, \"type_null_test\": null, \"type_bool_test\": true, \"type_array_string\": [\"ELEMENT 1\", \"ELEMENT 2\"], \"type_array_boolean\": [true, false], \"type_nullable_array\": null, \"type_enum\": \"GREEN\", \"type_map\": {\"KEY 2\": 17554, \"KEY 1\": 8546456}, \"type_fixed\": null, \"type_union\": null, \"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", \"city\": \"London\", \"state\": \"London\", \"zip\": \"NW1 6XE\"}}\n" +
- "{\"name\": \"Charlie\", \"favorite_number\": null, \"favorite_color\": \"blue\", \"type_long_test\": 1337, \"type_double_test\": 1.337, \"type_null_test\": null, \"type_bool_test\": false, \"type_array_string\": [], \"type_array_boolean\": [], \"type_nullable_array\": null, \"type_enum\": \"RED\", \"type_map\": {}, \"type_fixed\": null, \"type_union\": null, \"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", \"city\": \"London\", \"state\": \"London\", \"zip\": \"NW1 6XE\"}}\n";
+ expected = "{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null, \"type_long_test\": null, " +
+ "\"type_double_test\": 123.45, \"type_null_test\": null, \"type_bool_test\": true, \"type_array_string\": [\"ELEMENT 1\", \"ELEMENT 2\"], " +
+ "\"type_array_boolean\": [true, false], \"type_nullable_array\": null, \"type_enum\": \"GREEN\", \"type_map\": {\"KEY 2\": 17554, \"KEY 1\": 8546456}, " +
+ "\"type_fixed\": null, \"type_union\": null, \"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", \"city\": \"London\", \"state\": \"London\", \"zip\": \"NW1 6XE\"}, " +
+ "\"type_bytes\": {\"bytes\": \"\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\"}, \"type_date\": 2014-03-01, \"type_time_millis\": 12:12:12.000, " +
+ "\"type_time_micros\": 123456, \"type_timestamp_millis\": 2014-03-01T12:12:12.321Z, \"type_timestamp_micros\": 123456, " +
+ "\"type_decimal_bytes\": {\"bytes\": \"\\u0007Ð\"}, \"type_decimal_fixed\": [7, -48]}\n" +
+ "{\"name\": \"Charlie\", \"favorite_number\": null, \"favorite_color\": \"blue\", \"type_long_test\": 1337, \"type_double_test\": 1.337, " +
+ "\"type_null_test\": null, \"type_bool_test\": false, \"type_array_string\": [], \"type_array_boolean\": [], \"type_nullable_array\": null, " +
+ "\"type_enum\": \"RED\", \"type_map\": {}, \"type_fixed\": null, \"type_union\": null, \"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", " +
+ "\"city\": \"London\", \"state\": \"London\", \"zip\": \"NW1 6XE\"}, \"type_bytes\": {\"bytes\": \"\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\"}, " +
+ "\"type_date\": 2014-03-01, \"type_time_millis\": 12:12:12.000, \"type_time_micros\": 123456, \"type_timestamp_millis\": 2014-03-01T12:12:12.321Z, " +
+ "\"type_timestamp_micros\": 123456, \"type_decimal_bytes\": {\"bytes\": \"\\u0007Ð\"}, \"type_decimal_fixed\": [7, -48]}\n";
}
@Test
@@ -98,24 +110,31 @@ public class AvroTypeExtractionTest extends MultipleProgramsTestBase {
env.getConfig().enableForceAvro();
Path in = new Path(inFile.getAbsoluteFile().toURI());
- AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class);
+ AvroInputFormat<User> users = new AvroInputFormat<>(in, User.class);
DataSet<User> usersDS = env.createInput(users)
- .map(new MapFunction<User, User>() {
- @Override
- public User map(User value) throws Exception {
- Map<CharSequence, Long> ab = new HashMap<CharSequence, Long>(1);
- ab.put("hehe", 12L);
- value.setTypeMap(ab);
- return value;
- }
+ .map((MapFunction<User, User>) value -> {
+ Map<CharSequence, Long> ab = new HashMap<>(1);
+ ab.put("hehe", 12L);
+ value.setTypeMap(ab);
+ return value;
});
usersDS.writeAsText(resultPath);
env.execute("Simple Avro read job");
- expected = "{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null, \"type_long_test\": null, \"type_double_test\": 123.45, \"type_null_test\": null, \"type_bool_test\": true, \"type_array_string\": [\"ELEMENT 1\", \"ELEMENT 2\"], \"type_array_boolean\": [true, false], \"type_nullable_array\": null, \"type_enum\": \"GREEN\", \"type_map\": {\"hehe\": 12}, \"type_fixed\": null, \"type_union\": null, \"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", \"city\": \"London\", \"state\": \"London\", \"zip\": \"NW1 6XE\"}}\n" +
- "{\"name\": \"Charlie\", \"favorite_number\": null, \"favorite_color\": \"blue\", \"type_long_test\": 1337, \"type_double_test\": 1.337, \"type_null_test\": null, \"type_bool_test\": false, \"type_array_string\": [], \"type_array_boolean\": [], \"type_nullable_array\": null, \"type_enum\": \"RED\", \"type_map\": {\"hehe\": 12}, \"type_fixed\": null, \"type_union\": null, \"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", \"city\": \"London\", \"state\": \"London\", \"zip\": \"NW1 6XE\"}}\n";
+ expected = "{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null, \"type_long_test\": null, \"type_double_test\": 123.45, \"type_null_test\": null, " +
+ "\"type_bool_test\": true, \"type_array_string\": [\"ELEMENT 1\", \"ELEMENT 2\"], \"type_array_boolean\": [true, false], \"type_nullable_array\": null, " +
+ "\"type_enum\": \"GREEN\", \"type_map\": {\"hehe\": 12}, \"type_fixed\": null, \"type_union\": null, \"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", " +
+ "\"city\": \"London\", \"state\": \"London\", \"zip\": \"NW1 6XE\"}, \"type_bytes\": {\"bytes\": \"\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\"}, " +
+ "\"type_date\": 2014-03-01, \"type_time_millis\": 12:12:12.000, \"type_time_micros\": 123456, \"type_timestamp_millis\": 2014-03-01T12:12:12.321Z, \"type_timestamp_micros\": 123456, " +
+ "\"type_decimal_bytes\": {\"bytes\": \"\\u0007Ð\"}, \"type_decimal_fixed\": [7, -48]}\n" +
+ "{\"name\": \"Charlie\", \"favorite_number\": null, \"favorite_color\": \"blue\", \"type_long_test\": 1337, \"type_double_test\": 1.337, \"type_null_test\": null, " +
+ "\"type_bool_test\": false, \"type_array_string\": [], \"type_array_boolean\": [], \"type_nullable_array\": null, \"type_enum\": \"RED\", \"type_map\": {\"hehe\": 12}, " +
+ "\"type_fixed\": null, \"type_union\": null, \"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", \"city\": \"London\", \"state\": \"London\", \"zip\": \"NW1 6XE\"}, " +
+ "\"type_bytes\": {\"bytes\": \"\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\"}, \"type_date\": 2014-03-01, \"type_time_millis\": 12:12:12.000, " +
+ "\"type_time_micros\": 123456, \"type_timestamp_millis\": 2014-03-01T12:12:12.321Z, \"type_timestamp_micros\": 123456, \"type_decimal_bytes\": {\"bytes\": \"\\u0007Ð\"}, " +
+ "\"type_decimal_fixed\": [7, -48]}\n";
}
@@ -125,17 +144,17 @@ public class AvroTypeExtractionTest extends MultipleProgramsTestBase {
env.getConfig().enableObjectReuse();
Path in = new Path(inFile.getAbsoluteFile().toURI());
- AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class);
+ AvroInputFormat<User> users = new AvroInputFormat<>(in, User.class);
DataSet<User> usersDS = env.createInput(users);
- DataSet<Tuple2<String, Integer>> res = usersDS.groupBy("name").reduceGroup(new GroupReduceFunction<User, Tuple2<String, Integer>>() {
- @Override
- public void reduce(Iterable<User> values, Collector<Tuple2<String, Integer>> out) throws Exception {
+ DataSet<Tuple2<String, Integer>> res = usersDS
+ .groupBy("name")
+ .reduceGroup((GroupReduceFunction<User, Tuple2<String, Integer>>) (values, out) -> {
for (User u : values) {
- out.collect(new Tuple2<String, Integer>(u.getName().toString(), 1));
+ out.collect(new Tuple2<>(u.getName().toString(), 1));
}
- }
- });
+ })
+ .returns(Types.TUPLE(Types.STRING, Types.INT));
res.writeAsText(resultPath);
env.execute("Avro Key selection");
@@ -148,22 +167,17 @@ public class AvroTypeExtractionTest extends MultipleProgramsTestBase {
env.getConfig().enableForceAvro();
Path in = new Path(inFile.getAbsoluteFile().toURI());
- AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class);
+ AvroInputFormat<User> users = new AvroInputFormat<>(in, User.class);
DataSet<User> usersDS = env.createInput(users);
- DataSet<Tuple2<String, Integer>> res = usersDS.groupBy(new KeySelector<User, String>() {
- @Override
- public String getKey(User value) throws Exception {
- return String.valueOf(value.getName());
- }
- }).reduceGroup(new GroupReduceFunction<User, Tuple2<String, Integer>>() {
- @Override
- public void reduce(Iterable<User> values, Collector<Tuple2<String, Integer>> out) throws Exception {
+ DataSet<Tuple2<String, Integer>> res = usersDS
+ .groupBy((KeySelector<User, String>) value -> String.valueOf(value.getName()))
+ .reduceGroup((GroupReduceFunction<User, Tuple2<String, Integer>>) (values, out) -> {
for (User u : values) {
- out.collect(new Tuple2<String, Integer>(u.getName().toString(), 1));
+ out.collect(new Tuple2<>(u.getName().toString(), 1));
}
- }
- });
+ })
+ .returns(Types.TUPLE(Types.STRING, Types.INT));
res.writeAsText(resultPath);
env.execute("Avro Key selection");
@@ -177,22 +191,17 @@ public class AvroTypeExtractionTest extends MultipleProgramsTestBase {
env.getConfig().enableForceKryo();
Path in = new Path(inFile.getAbsoluteFile().toURI());
- AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class);
+ AvroInputFormat<User> users = new AvroInputFormat<>(in, User.class);
DataSet<User> usersDS = env.createInput(users);
- DataSet<Tuple2<String, Integer>> res = usersDS.groupBy(new KeySelector<User, String>() {
- @Override
- public String getKey(User value) throws Exception {
- return String.valueOf(value.getName());
- }
- }).reduceGroup(new GroupReduceFunction<User, Tuple2<String, Integer>>() {
- @Override
- public void reduce(Iterable<User> values, Collector<Tuple2<String, Integer>> out) throws Exception {
+ DataSet<Tuple2<String, Integer>> res = usersDS
+ .groupBy((KeySelector<User, String>) value -> String.valueOf(value.getName()))
+ .reduceGroup((GroupReduceFunction<User, Tuple2<String, Integer>>) (values, out) -> {
for (User u : values) {
- out.collect(new Tuple2<String, Integer>(u.getName().toString(), 1));
+ out.collect(new Tuple2<>(u.getName().toString(), 1));
}
- }
- });
+ })
+ .returns(Types.TUPLE(Types.STRING, Types.INT));
res.writeAsText(resultPath);
env.execute("Avro Key selection");
@@ -216,17 +225,17 @@ public class AvroTypeExtractionTest extends MultipleProgramsTestBase {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Path in = new Path(inFile.getAbsoluteFile().toURI());
- AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class);
+ AvroInputFormat<User> users = new AvroInputFormat<>(in, User.class);
DataSet<User> usersDS = env.createInput(users);
- DataSet<Object> res = usersDS.groupBy(fieldName).reduceGroup(new GroupReduceFunction<User, Object>() {
- @Override
- public void reduce(Iterable<User> values, Collector<Object> out) throws Exception {
+ DataSet<Object> res = usersDS
+ .groupBy(fieldName)
+ .reduceGroup((GroupReduceFunction<User, Object>) (values, out) -> {
for (User u : values) {
out.collect(u.get(fieldName));
}
- }
- });
+ })
+ .returns(Object.class);
res.writeAsText(resultPath);
env.execute("Simple Avro read job");
@@ -234,14 +243,19 @@ public class AvroTypeExtractionTest extends MultipleProgramsTestBase {
ExecutionConfig ec = env.getConfig();
Assert.assertTrue(ec.getRegisteredKryoTypes().contains(Fixed16.class));
- if (fieldName.equals("name")) {
- expected = "Alyssa\nCharlie";
- } else if (fieldName.equals("type_enum")) {
- expected = "GREEN\nRED\n";
- } else if (fieldName.equals("type_double_test")) {
- expected = "123.45\n1.337\n";
- } else {
- Assert.fail("Unknown field");
+ switch (fieldName) {
+ case "name":
+ expected = "Alyssa\nCharlie";
+ break;
+ case "type_enum":
+ expected = "GREEN\nRED\n";
+ break;
+ case "type_double_test":
+ expected = "123.45\n1.337\n";
+ break;
+ default:
+ Assert.fail("Unknown field");
+ break;
}
after();
http://git-wip-us.apache.org/repos/asf/flink/blob/c34c7e41/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializerTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializerTest.java
index f641636..cfd1506 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializerTest.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializerTest.java
@@ -26,7 +26,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.runtime.PojoSerializer;
import org.apache.flink.api.java.typeutils.runtime.PojoSerializer.PojoSerializerConfigSnapshot;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.formats.avro.generated.User;
+import org.apache.flink.formats.avro.generated.SimpleUser;
import org.apache.flink.formats.avro.utils.TestDataGenerator;
import org.junit.Test;
@@ -51,12 +51,16 @@ import static org.junit.Assert.assertTrue;
* works properly.
*
* <p>This test can be dropped once we drop backwards compatibility with Flink 1.3 snapshots.
+ *
+ * <p>The {@link BackwardsCompatibleAvroSerializer} does not support custom Kryo registrations (which
+ * logical types require for Avro 1.8 because Kryo does not support Joda-Time). We introduced a
+ * simpler user record for pre-Avro 1.8 test cases.
*/
public class BackwardsCompatibleAvroSerializerTest {
- private static final String SNAPSHOT_RESOURCE = "flink-1.3-avro-type-serializer-snapshot";
+ private static final String SNAPSHOT_RESOURCE = "flink-1.6-avro-type-serializer-snapshot";
- private static final String DATA_RESOURCE = "flink-1.3-avro-type-serialized-data";
+ private static final String DATA_RESOURCE = "flink-1.6-avro-type-serialized-data";
@SuppressWarnings("unused")
private static final String SNAPSHOT_RESOURCE_WRITER = "/data/repositories/flink/flink-formats/flink-avro/src/test/resources/" + SNAPSHOT_RESOURCE;
@@ -73,7 +77,7 @@ public class BackwardsCompatibleAvroSerializerTest {
// retrieve the old config snapshot
- final TypeSerializer<User> serializer;
+ final TypeSerializer<SimpleUser> serializer;
final TypeSerializerConfigSnapshot configSnapshot;
try (InputStream in = getClass().getClassLoader().getResourceAsStream(SNAPSHOT_RESOURCE)) {
@@ -86,7 +90,7 @@ public class BackwardsCompatibleAvroSerializerTest {
assertEquals(1, deserialized.size());
@SuppressWarnings("unchecked")
- final TypeSerializer<User> typedSerializer = (TypeSerializer<User>) deserialized.get(0).f0;
+ final TypeSerializer<SimpleUser> typedSerializer = (TypeSerializer<SimpleUser>) deserialized.get(0).f0;
serializer = typedSerializer;
configSnapshot = deserialized.get(0).f1;
@@ -104,14 +108,14 @@ public class BackwardsCompatibleAvroSerializerTest {
// sanity check for the test: check that a PoJoSerializer and the original serializer work together
assertFalse(serializer.ensureCompatibility(configSnapshot).isRequiresMigration());
- final TypeSerializer<User> newSerializer = new AvroTypeInfo<>(User.class, true).createSerializer(new ExecutionConfig());
+ final TypeSerializer<SimpleUser> newSerializer = new AvroTypeInfo<>(SimpleUser.class, true).createSerializer(new ExecutionConfig());
assertFalse(newSerializer.ensureCompatibility(configSnapshot).isRequiresMigration());
// deserialize the data and make sure this still works
validateDeserialization(newSerializer);
TypeSerializerConfigSnapshot nextSnapshot = newSerializer.snapshotConfiguration();
- final TypeSerializer<User> nextSerializer = new AvroTypeInfo<>(User.class, true).createSerializer(new ExecutionConfig());
+ final TypeSerializer<SimpleUser> nextSerializer = new AvroTypeInfo<>(SimpleUser.class, true).createSerializer(new ExecutionConfig());
assertFalse(nextSerializer.ensureCompatibility(nextSnapshot).isRequiresMigration());
@@ -119,7 +123,7 @@ public class BackwardsCompatibleAvroSerializerTest {
validateDeserialization(nextSerializer);
}
- private static void validateDeserialization(TypeSerializer<User> serializer) throws IOException {
+ private static void validateDeserialization(TypeSerializer<SimpleUser> serializer) throws IOException {
final Random rnd = new Random(RANDOM_SEED);
try (InputStream in = BackwardsCompatibleAvroSerializerTest.class.getClassLoader()
@@ -128,10 +132,10 @@ public class BackwardsCompatibleAvroSerializerTest {
final DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(in);
for (int i = 0; i < NUM_DATA_ENTRIES; i++) {
- final User deserialized = serializer.deserialize(inView);
+ final SimpleUser deserialized = serializer.deserialize(inView);
// deterministically generate a reference record
- final User reference = TestDataGenerator.generateRandomUser(rnd);
+ final SimpleUser reference = TestDataGenerator.generateRandomSimpleUser(rnd);
assertEquals(reference, deserialized);
}
@@ -141,9 +145,9 @@ public class BackwardsCompatibleAvroSerializerTest {
// run this code to generate the test data
// public static void main(String[] args) throws Exception {
//
-// AvroTypeInfo<User> typeInfo = new AvroTypeInfo<>(User.class);
+// AvroTypeInfo<SimpleUser> typeInfo = new AvroTypeInfo<>(SimpleUser.class);
//
-// TypeSerializer<User> serializer = typeInfo.createPojoSerializer(new ExecutionConfig());
+// TypeSerializer<SimpleUser> serializer = typeInfo.createPojoSerializer(new ExecutionConfig());
// TypeSerializerConfigSnapshot confSnapshot = serializer.snapshotConfiguration();
//
// try (FileOutputStream fos = new FileOutputStream(SNAPSHOT_RESOURCE_WRITER)) {
@@ -160,7 +164,7 @@ public class BackwardsCompatibleAvroSerializerTest {
// final Random rnd = new Random(RANDOM_SEED);
//
// for (int i = 0; i < NUM_DATA_ENTRIES; i++) {
-// serializer.serialize(TestDataGenerator.generateRandomUser(rnd), out);
+// serializer.serialize(TestDataGenerator.generateRandomSimpleUser(rnd), out);
// }
// }
// }
http://git-wip-us.apache.org/repos/asf/flink/blob/c34c7e41/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/AvroTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/AvroTestUtils.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/AvroTestUtils.java
index ce23ccc..9d77f32 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/AvroTestUtils.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/AvroTestUtils.java
@@ -18,87 +18,44 @@
package org.apache.flink.formats.avro.utils;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.formats.avro.generated.Address;
import org.apache.flink.formats.avro.generated.Colors;
+import org.apache.flink.formats.avro.generated.Fixed16;
+import org.apache.flink.formats.avro.generated.Fixed2;
import org.apache.flink.formats.avro.generated.User;
import org.apache.flink.types.Row;
import org.apache.avro.Schema;
-import org.apache.avro.SchemaBuilder;
+import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.EncoderFactory;
-import org.apache.avro.reflect.ReflectData;
import org.apache.avro.specific.SpecificRecord;
-import org.apache.avro.specific.SpecificRecordBase;
+import org.joda.time.DateTime;
+import org.joda.time.LocalDate;
+import org.joda.time.LocalTime;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
-import java.util.ArrayList;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
import java.util.Arrays;
-import java.util.HashMap;
+import java.util.Collections;
/**
* Utilities for creating Avro Schemas.
*/
public final class AvroTestUtils {
- private static final String NAMESPACE = "org.apache.flink.streaming.connectors.kafka";
-
- /**
- * Creates a flat Avro Schema for testing.
- */
- public static Schema createFlatAvroSchema(String[] fieldNames, TypeInformation[] fieldTypes) {
- final SchemaBuilder.FieldAssembler<Schema> fieldAssembler = SchemaBuilder
- .record("BasicAvroRecord")
- .namespace(NAMESPACE)
- .fields();
-
- final Schema nullSchema = Schema.create(Schema.Type.NULL);
-
- for (int i = 0; i < fieldNames.length; i++) {
- Schema schema = ReflectData.get().getSchema(fieldTypes[i].getTypeClass());
- Schema unionSchema = Schema.createUnion(Arrays.asList(nullSchema, schema));
- fieldAssembler.name(fieldNames[i]).type(unionSchema).noDefault();
- }
-
- return fieldAssembler.endRecord();
- }
-
/**
- * Tests a simple Avro data types without nesting.
+ * Tests all Avro data types as well as nested types for a specific record.
*/
- public static Tuple3<Class<? extends SpecificRecordBase>, SpecificRecord, Row> getSimpleTestData() {
- final Address addr = Address.newBuilder()
- .setNum(42)
- .setStreet("Main Street 42")
- .setCity("Test City")
- .setState("Test State")
- .setZip("12345")
- .build();
-
- final Row rowAddr = new Row(5);
- rowAddr.setField(0, 42);
- rowAddr.setField(1, "Main Street 42");
- rowAddr.setField(2, "Test City");
- rowAddr.setField(3, "Test State");
- rowAddr.setField(4, "12345");
-
- final Tuple3<Class<? extends SpecificRecordBase>, SpecificRecord, Row> t = new Tuple3<>();
- t.f0 = Address.class;
- t.f1 = addr;
- t.f2 = rowAddr;
-
- return t;
- }
-
- /**
- * Tests all Avro data types as well as nested types.
- */
- public static Tuple3<Class<? extends SpecificRecordBase>, SpecificRecord, Row> getComplexTestData() {
+ public static Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> getSpecificTestData() {
final Address addr = Address.newBuilder()
.setNum(42)
.setStreet("Main Street 42")
@@ -122,17 +79,30 @@ public final class AvroTestUtils {
.setTypeDoubleTest(1.337d)
.setTypeNullTest(null)
.setTypeBoolTest(false)
- .setTypeArrayString(new ArrayList<CharSequence>())
- .setTypeArrayBoolean(new ArrayList<Boolean>())
+ .setTypeArrayString(Arrays.asList("hello", "world"))
+ .setTypeArrayBoolean(Arrays.asList(true, true, false))
.setTypeNullableArray(null)
.setTypeEnum(Colors.RED)
- .setTypeMap(new HashMap<CharSequence, Long>())
- .setTypeFixed(null)
- .setTypeUnion(null)
+ .setTypeMap(Collections.singletonMap("test", 12L))
+ .setTypeFixed(new Fixed16(new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}))
+ .setTypeUnion(12.0)
.setTypeNested(addr)
+ .setTypeBytes(ByteBuffer.allocate(10))
+ .setTypeDate(LocalDate.parse("2014-03-01"))
+ .setTypeTimeMillis(LocalTime.parse("12:12:12"))
+ .setTypeTimeMicros(123456)
+ .setTypeTimestampMillis(DateTime.parse("2014-03-01T12:12:12.321Z"))
+ .setTypeTimestampMicros(123456L)
+ // byte array must contain the two's-complement representation of the
+ // unscaled integer value in big-endian byte order
+ .setTypeDecimalBytes(ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()))
+ // array of length n can store at most
+ // Math.floor(Math.log10(Math.pow(2, 8 * n - 1) - 1))
+ // base-10 digits of precision
+ .setTypeDecimalFixed(new Fixed2(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()))
.build();
- final Row rowUser = new Row(15);
+ final Row rowUser = new Row(23);
rowUser.setField(0, "Charlie");
rowUser.setField(1, null);
rowUser.setField(2, "blue");
@@ -140,16 +110,24 @@ public final class AvroTestUtils {
rowUser.setField(4, 1.337d);
rowUser.setField(5, null);
rowUser.setField(6, false);
- rowUser.setField(7, new ArrayList<CharSequence>());
- rowUser.setField(8, new ArrayList<Boolean>());
+ rowUser.setField(7, new String[]{"hello", "world"});
+ rowUser.setField(8, new Boolean[]{true, true, false});
rowUser.setField(9, null);
- rowUser.setField(10, Colors.RED);
- rowUser.setField(11, new HashMap<CharSequence, Long>());
- rowUser.setField(12, null);
- rowUser.setField(13, null);
+ rowUser.setField(10, "RED");
+ rowUser.setField(11, Collections.singletonMap("test", 12L));
+ rowUser.setField(12, new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16});
+ rowUser.setField(13, 12.0);
rowUser.setField(14, rowAddr);
-
- final Tuple3<Class<? extends SpecificRecordBase>, SpecificRecord, Row> t = new Tuple3<>();
+ rowUser.setField(15, new byte[10]);
+ rowUser.setField(16, Date.valueOf("2014-03-01"));
+ rowUser.setField(17, Time.valueOf("12:12:12"));
+ rowUser.setField(18, 123456);
+ rowUser.setField(19, Timestamp.valueOf("2014-03-01 12:12:12.321"));
+ rowUser.setField(20, 123456L);
+ rowUser.setField(21, BigDecimal.valueOf(2000, 2));
+ rowUser.setField(22, BigDecimal.valueOf(2000, 2));
+
+ final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> t = new Tuple3<>();
t.f0 = User.class;
t.f1 = user;
t.f2 = rowUser;
@@ -158,6 +136,109 @@ public final class AvroTestUtils {
}
/**
+ * Tests almost all Avro data types as well as nested types for a generic record.
+ */
+ public static Tuple3<GenericRecord, Row, Schema> getGenericTestData() {
+ final String schemaString =
+ "{\"type\":\"record\",\"name\":\"GenericUser\",\"namespace\":\"org.apache.flink.formats.avro.generated\"," +
+ "\"fields\": [{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"favorite_number\",\"type\":[\"int\",\"null\"]}," +
+ "{\"name\":\"favorite_color\",\"type\":[\"string\",\"null\"]},{\"name\":\"type_long_test\",\"type\":[\"long\",\"null\"]}" +
+ ",{\"name\":\"type_double_test\",\"type\":\"double\"},{\"name\":\"type_null_test\",\"type\":[\"null\"]}," +
+ "{\"name\":\"type_bool_test\",\"type\":[\"boolean\"]},{\"name\":\"type_array_string\",\"type\":" +
+ "{\"type\":\"array\",\"items\":\"string\"}},{\"name\":\"type_array_boolean\",\"type\":{\"type\":\"array\"," +
+ "\"items\":\"boolean\"}},{\"name\":\"type_nullable_array\",\"type\":[\"null\",{\"type\":\"array\"," +
+ "\"items\":\"string\"}],\"default\":null},{\"name\":\"type_enum\",\"type\":{\"type\":\"enum\"," +
+ "\"name\":\"Colors\",\"symbols\":[\"RED\",\"GREEN\",\"BLUE\"]}},{\"name\":\"type_map\",\"type\":{\"type\":\"map\"," +
+ "\"values\":\"long\"}},{\"name\":\"type_fixed\",\"type\":[\"null\",{\"type\":\"fixed\",\"name\":\"Fixed16\"," +
+ "\"size\":16}],\"size\":16},{\"name\":\"type_union\",\"type\":[\"null\",\"boolean\",\"long\",\"double\"]}," +
+ "{\"name\":\"type_nested\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"Address\",\"fields\":[{\"name\":\"num\"," +
+ "\"type\":\"int\"},{\"name\":\"street\",\"type\":\"string\"},{\"name\":\"city\",\"type\":\"string\"}," +
+ "{\"name\":\"state\",\"type\":\"string\"},{\"name\":\"zip\",\"type\":\"string\"}]}]},{\"name\":\"type_bytes\"," +
+ "\"type\":\"bytes\"},{\"name\":\"type_date\",\"type\":{\"type\":\"int\",\"logicalType\":\"date\"}}," +
+ "{\"name\":\"type_time_millis\",\"type\":{\"type\":\"int\",\"logicalType\":\"time-millis\"}},{\"name\":\"type_time_micros\"," +
+ "\"type\":{\"type\":\"int\",\"logicalType\":\"time-micros\"}},{\"name\":\"type_timestamp_millis\",\"type\":{\"type\":\"long\"," +
+ "\"logicalType\":\"timestamp-millis\"}},{\"name\":\"type_timestamp_micros\",\"type\":{\"type\":\"long\"," +
+ "\"logicalType\":\"timestamp-micros\"}},{\"name\":\"type_decimal_bytes\",\"type\":{\"type\":\"bytes\"," +
+ "\"logicalType\":\"decimal\",\"precision\":4,\"scale\":2}},{\"name\":\"type_decimal_fixed\",\"type\":{\"type\":\"fixed\"," +
+ "\"name\":\"Fixed2\",\"size\":2,\"logicalType\":\"decimal\",\"precision\":4,\"scale\":2}}]}";
+ final Schema schema = new Schema.Parser().parse(schemaString);
+ GenericRecord addr = new GenericData.Record(schema.getField("type_nested").schema().getTypes().get(1));
+ addr.put("num", 42);
+ addr.put("street", "Main Street 42");
+ addr.put("city", "Test City");
+ addr.put("state", "Test State");
+ addr.put("zip", "12345");
+
+ final Row rowAddr = new Row(5);
+ rowAddr.setField(0, 42);
+ rowAddr.setField(1, "Main Street 42");
+ rowAddr.setField(2, "Test City");
+ rowAddr.setField(3, "Test State");
+ rowAddr.setField(4, "12345");
+
+ final GenericRecord user = new GenericData.Record(schema);
+ user.put("name", "Charlie");
+ user.put("favorite_number", null);
+ user.put("favorite_color", "blue");
+ user.put("type_long_test", 1337L);
+ user.put("type_double_test", 1.337d);
+ user.put("type_null_test", null);
+ user.put("type_bool_test", false);
+ user.put("type_array_string", Arrays.asList("hello", "world"));
+ user.put("type_array_boolean", Arrays.asList(true, true, false));
+ user.put("type_nullable_array", null);
+ user.put("type_enum", new GenericData.EnumSymbol(schema.getField("type_enum").schema(), "RED"));
+ user.put("type_map", Collections.singletonMap("test", 12L));
+ user.put("type_fixed", new Fixed16(new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}));
+ user.put("type_union", 12.0);
+ user.put("type_nested", addr);
+ user.put("type_bytes", ByteBuffer.allocate(10));
+ user.put("type_date", LocalDate.parse("2014-03-01"));
+ user.put("type_time_millis", LocalTime.parse("12:12:12"));
+ user.put("type_time_micros", 123456);
+ user.put("type_timestamp_millis", DateTime.parse("2014-03-01T12:12:12.321Z"));
+ user.put("type_timestamp_micros", 123456L);
+ user.put("type_decimal_bytes",
+ ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()));
+ user.put("type_decimal_fixed",
+ new GenericData.Fixed(
+ schema.getField("type_decimal_fixed").schema(),
+ BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()));
+
+ final Row rowUser = new Row(23);
+ rowUser.setField(0, "Charlie");
+ rowUser.setField(1, null);
+ rowUser.setField(2, "blue");
+ rowUser.setField(3, 1337L);
+ rowUser.setField(4, 1.337d);
+ rowUser.setField(5, null);
+ rowUser.setField(6, false);
+ rowUser.setField(7, new String[]{"hello", "world"});
+ rowUser.setField(8, new Boolean[]{true, true, false});
+ rowUser.setField(9, null);
+ rowUser.setField(10, "RED");
+ rowUser.setField(11, Collections.singletonMap("test", 12L));
+ rowUser.setField(12, new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16});
+ rowUser.setField(13, 12.0);
+ rowUser.setField(14, rowAddr);
+ rowUser.setField(15, new byte[10]);
+ rowUser.setField(16, Date.valueOf("2014-03-01"));
+ rowUser.setField(17, Time.valueOf("12:12:12"));
+ rowUser.setField(18, 123456);
+ rowUser.setField(19, Timestamp.valueOf("2014-03-01 12:12:12.321"));
+ rowUser.setField(20, 123456L);
+ rowUser.setField(21, BigDecimal.valueOf(2000, 2));
+ rowUser.setField(22, BigDecimal.valueOf(2000, 2));
+
+ final Tuple3<GenericRecord, Row, Schema> t = new Tuple3<>();
+ t.f0 = user;
+ t.f1 = rowUser;
+ t.f2 = schema;
+
+ return t;
+ }
+
+ /**
* Writes given record using specified schema.
* @param record record to serialize
* @param schema schema to use for serialization
http://git-wip-us.apache.org/repos/asf/flink/blob/c34c7e41/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/TestDataGenerator.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/TestDataGenerator.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/TestDataGenerator.java
index 9205627..a4c5bf8 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/TestDataGenerator.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/TestDataGenerator.java
@@ -21,8 +21,16 @@ package org.apache.flink.formats.avro.utils;
import org.apache.flink.formats.avro.generated.Address;
import org.apache.flink.formats.avro.generated.Colors;
import org.apache.flink.formats.avro.generated.Fixed16;
+import org.apache.flink.formats.avro.generated.Fixed2;
+import org.apache.flink.formats.avro.generated.SimpleUser;
import org.apache.flink.formats.avro.generated.User;
+import org.joda.time.DateTime;
+import org.joda.time.LocalDate;
+import org.joda.time.LocalTime;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -49,7 +57,35 @@ public class TestDataGenerator {
new HashMap<>(),
generateRandomFixed16(rnd),
generateRandomUnion(rnd),
- generateRandomAddress(rnd));
+ generateRandomAddress(rnd),
+ generateRandomBytes(rnd),
+ LocalDate.parse("2014-03-01"),
+ LocalTime.parse("12:12:12"),
+ 123456,
+ DateTime.parse("2014-03-01T12:12:12.321Z"),
+ 123456L,
+ ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()),
+ new Fixed2(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()));
+ }
+
+ public static SimpleUser generateRandomSimpleUser(Random rnd) {
+ return new SimpleUser(
+ generateRandomString(rnd, 50),
+ rnd.nextBoolean() ? null : rnd.nextInt(),
+ rnd.nextBoolean() ? null : generateRandomString(rnd, 6),
+ rnd.nextBoolean() ? null : rnd.nextLong(),
+ rnd.nextDouble(),
+ null,
+ rnd.nextBoolean(),
+ generateRandomStringList(rnd, 20, 30),
+ generateRandomBooleanList(rnd, 20),
+ rnd.nextBoolean() ? null : generateRandomStringList(rnd, 20, 20),
+ generateRandomColor(rnd),
+ new HashMap<>(),
+ generateRandomFixed16(rnd),
+ generateRandomUnion(rnd),
+ generateRandomAddress(rnd),
+ generateRandomBytes(rnd));
}
public static Colors generateRandomColor(Random rnd) {
@@ -76,6 +112,12 @@ public class TestDataGenerator {
generateRandomString(rnd, 20));
}
+ public static ByteBuffer generateRandomBytes(Random rnd) {
+ final byte[] bytes = new byte[10];
+ rnd.nextBytes(bytes);
+ return ByteBuffer.wrap(bytes);
+ }
+
private static List<Boolean> generateRandomBooleanList(Random rnd, int maxEntries) {
final int num = rnd.nextInt(maxEntries + 1);
ArrayList<Boolean> list = new ArrayList<>();
http://git-wip-us.apache.org/repos/asf/flink/blob/c34c7e41/flink-formats/flink-avro/src/test/java/org/apache/flink/table/descriptors/AvroTest.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/table/descriptors/AvroTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/table/descriptors/AvroTest.java
index 2345553..342b32c 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/table/descriptors/AvroTest.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/table/descriptors/AvroTest.java
@@ -23,7 +23,7 @@ import org.apache.flink.table.api.ValidationException;
import org.junit.Test;
-import java.util.Collections;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -38,12 +38,20 @@ public class AvroTest extends DescriptorTestBase {
removePropertyAndVerify(descriptors().get(0), "format.record-class");
}
+ @Test(expected = ValidationException.class)
+ public void testRecordClassAndAvroSchema() {
+ addPropertyAndVerify(descriptors().get(0), "format.avro-schema", "{...}");
+ }
+
// --------------------------------------------------------------------------------------------
@Override
public List<Descriptor> descriptors() {
final Descriptor desc1 = new Avro().recordClass(User.class);
- return Collections.singletonList(desc1);
+
+ final Descriptor desc2 = new Avro().avroSchema("{...}");
+
+ return Arrays.asList(desc1, desc2);
}
@Override
@@ -53,7 +61,12 @@ public class AvroTest extends DescriptorTestBase {
props1.put("format.property-version", "1");
props1.put("format.record-class", "org.apache.flink.formats.avro.generated.User");
- return Collections.singletonList(props1);
+ final Map<String, String> props2 = new HashMap<>();
+ props2.put("format.type", "avro");
+ props2.put("format.property-version", "1");
+ props2.put("format.avro-schema", "{...}");
+
+ return Arrays.asList(props1, props2);
}
@Override