You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/11/03 17:11:31 UTC
[08/21] flink git commit: [FLINK-7420] [avro] Move all Avro code to
flink-avro
http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/twitter/TwitterExample.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/twitter/TwitterExample.scala b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/twitter/TwitterExample.scala
index d916116..048e7ac 100644
--- a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/twitter/TwitterExample.scala
+++ b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/twitter/TwitterExample.scala
@@ -23,12 +23,11 @@ import java.util.StringTokenizer
import org.apache.flink.api.common.functions.FlatMapFunction
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.api.scala._
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.{JsonNode, ObjectMapper}
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.twitter.TwitterSource
import org.apache.flink.streaming.examples.twitter.util.TwitterExampleData
import org.apache.flink.util.Collector
-import org.codehaus.jackson.JsonNode
-import org.codehaus.jackson.map.ObjectMapper
import scala.collection.mutable.ListBuffer
http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-formats/flink-avro/pom.xml
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/pom.xml b/flink-formats/flink-avro/pom.xml
new file mode 100644
index 0000000..19d9129
--- /dev/null
+++ b/flink-formats/flink-avro/pom.xml
@@ -0,0 +1,280 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-formats</artifactId>
+ <version>1.4-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>flink-avro_${scala.binary.version}</artifactId>
+ <name>flink-avro</name>
+
+ <packaging>jar</packaging>
+
+ <dependencies>
+
+ <!-- core dependencies -->
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-core</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ <version>1.8.2</version>
+ </dependency>
+
+ <!-- test dependencies -->
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-test-utils-junit</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-test-utils_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-clients_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-core</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>create-test-dependency</id>
+ <phase>process-test-classes</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ <configuration>
+ <archive>
+ <manifest>
+ <mainClass>org.apache.flink.formats.avro.testjar.AvroExternalJarProgram</mainClass>
+ </manifest>
+ </archive>
+ <finalName>maven</finalName>
+ <attach>false</attach>
+ <descriptors>
+ <descriptor>src/test/assembly/test-assembly.xml</descriptor>
+ </descriptors>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <!--Remove the AvroExternalJarProgram code from the test-classes directory since it musn't be in the
+ classpath when running the tests to actually test whether the user code class loader
+ is properly used.-->
+ <plugin>
+ <artifactId>maven-clean-plugin</artifactId>
+ <version>2.5</version><!--$NO-MVN-MAN-VER$-->
+ <executions>
+ <execution>
+ <id>remove-avroexternalprogram</id>
+ <phase>process-test-classes</phase>
+ <goals>
+ <goal>clean</goal>
+ </goals>
+ <configuration>
+ <excludeDefaultDirectories>true</excludeDefaultDirectories>
+ <filesets>
+ <fileset>
+ <directory>${project.build.testOutputDirectory}</directory>
+ <includes>
+ <include>**/testjar/*.class</include>
+ </includes>
+ </fileset>
+ </filesets>
+ </configuration>
+ </execution>
+ </executions>
+ <configuration>
+ <filesets>
+ <fileset>
+ <directory>${project.basedir}/src/test/java/org/apache/flink/formats/avro/generated</directory>
+ </fileset>
+ </filesets>
+ </configuration>
+ </plugin>
+ <!-- Generate Test class from avro schema -->
+ <plugin>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro-maven-plugin</artifactId>
+ <version>1.8.2</version>
+ <executions>
+ <execution>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>schema</goal>
+ </goals>
+ <configuration>
+ <testSourceDirectory>${project.basedir}/src/test/resources/avro</testSourceDirectory>
+ <testOutputDirectory>${project.basedir}/src/test/java/</testOutputDirectory>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+ <!-- Add Avro test classes to test jar in order to test AvroTypeInfo. -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>shade-flink</id>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration combine.self="override">
+ <dependencyReducedPomLocation>${project.basedir}/target/dependency-reduced-pom.xml</dependencyReducedPomLocation>
+ <promoteTransitiveDependencies>true</promoteTransitiveDependencies>
+ <artifactSet>
+ <includes>
+ <include>org.codehaus.jackson:*</include>
+ </includes>
+ </artifactSet>
+ <relocations>
+ <relocation>
+ <pattern>org.codehaus.jackson</pattern>
+ <shadedPattern>org.apache.flink.avro.shaded.org.codehouse.jackson</shadedPattern>
+ </relocation>
+ </relocations>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+
+ <pluginManagement>
+ <plugins>
+ <!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.-->
+ <plugin>
+ <groupId>org.eclipse.m2e</groupId>
+ <artifactId>lifecycle-mapping</artifactId>
+ <version>1.0.0</version>
+ <configuration>
+ <lifecycleMappingMetadata>
+ <pluginExecutions>
+ <pluginExecution>
+ <pluginExecutionFilter>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <versionRange>[2.4,)</versionRange>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </pluginExecutionFilter>
+ <action>
+ <ignore/>
+ </action>
+ </pluginExecution>
+ <pluginExecution>
+ <pluginExecutionFilter>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-clean-plugin</artifactId>
+ <versionRange>[1,)</versionRange>
+ <goals>
+ <goal>clean</goal>
+ </goals>
+ </pluginExecutionFilter>
+ <action>
+ <ignore/>
+ </action>
+ </pluginExecution>
+ <pluginExecution>
+ <pluginExecutionFilter>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro-maven-plugin</artifactId>
+ <versionRange>[1.7.7,)</versionRange>
+ <goals>
+ <goal>schema</goal>
+ </goals>
+ </pluginExecutionFilter>
+ <action>
+ <ignore/>
+ </action>
+ </pluginExecution>
+ </pluginExecutions>
+ </lifecycleMappingMetadata>
+ </configuration>
+ </plugin>
+ </plugins>
+ </pluginManagement>
+ </build>
+
+</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-formats/flink-avro/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java
new file mode 100644
index 0000000..58085f6
--- /dev/null
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.annotation.Public;
+
+import org.apache.avro.specific.SpecificRecordBase;
+
+import static org.apache.flink.formats.avro.typeutils.AvroTypeInfo.generateFieldsFromAvroSchema;
+
+/**
+ * @deprecated Please use <code>org.apache.flink.formats.avro.typeutils.AvroTypeInfo</code>
+ * in the <code>flink-avro</code> module. This class will be removed in the near future.
+ */
+@Deprecated
+@Public
+public class AvroTypeInfo<T extends SpecificRecordBase> extends PojoTypeInfo<T> {
+
+ public AvroTypeInfo(Class<T> typeClass) {
+ super(typeClass, generateFieldsFromAvroSchema(typeClass));
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroInputFormat.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroInputFormat.java
new file mode 100644
index 0000000..9b73ceb
--- /dev/null
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroInputFormat.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.api.common.io.CheckpointableInputFormat;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.avro.utils.FSDataInputStreamWrapper;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.file.SeekableInput;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * Provides a {@link FileInputFormat} for Avro records.
+ *
+ * @param <E>
+ * the type of the result Avro record. If you specify
+ * {@link GenericRecord} then the result will be returned as a
+ * {@link GenericRecord}, so you do not have to know the schema ahead
+ * of time.
+ */
+public class AvroInputFormat<E> extends FileInputFormat<E> implements ResultTypeQueryable<E>,
+ CheckpointableInputFormat<FileInputSplit, Tuple2<Long, Long>> {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(AvroInputFormat.class);
+
+ private final Class<E> avroValueType;
+
+ private boolean reuseAvroValue = true;
+
+ private transient DataFileReader<E> dataFileReader;
+
+ private transient long end;
+
+ private transient long recordsReadSinceLastSync;
+
+ private long lastSync = -1L;
+
+ public AvroInputFormat(Path filePath, Class<E> type) {
+ super(filePath);
+ this.avroValueType = type;
+ }
+
+ /**
+ * Sets the flag whether to reuse the Avro value instance for all records.
+ * By default, the input format reuses the Avro value.
+ *
+ * @param reuseAvroValue True, if the input format should reuse the Avro value instance, false otherwise.
+ */
+ public void setReuseAvroValue(boolean reuseAvroValue) {
+ this.reuseAvroValue = reuseAvroValue;
+ }
+
+ /**
+ * If set, the InputFormat will only read entire files.
+ */
+ public void setUnsplittable(boolean unsplittable) {
+ this.unsplittable = unsplittable;
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Typing
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public TypeInformation<E> getProducedType() {
+ return TypeExtractor.getForClass(this.avroValueType);
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Input Format Methods
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public void open(FileInputSplit split) throws IOException {
+ super.open(split);
+ dataFileReader = initReader(split);
+ dataFileReader.sync(split.getStart());
+ lastSync = dataFileReader.previousSync();
+ }
+
+ private DataFileReader<E> initReader(FileInputSplit split) throws IOException {
+ DatumReader<E> datumReader;
+
+ if (org.apache.avro.generic.GenericRecord.class == avroValueType) {
+ datumReader = new GenericDatumReader<E>();
+ } else {
+ datumReader = org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroValueType)
+ ? new SpecificDatumReader<E>(avroValueType) : new ReflectDatumReader<E>(avroValueType);
+ }
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Opening split {}", split);
+ }
+
+ SeekableInput in = new FSDataInputStreamWrapper(stream, split.getPath().getFileSystem().getFileStatus(split.getPath()).getLen());
+ DataFileReader<E> dataFileReader = (DataFileReader) DataFileReader.openReader(in, datumReader);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Loaded SCHEMA: {}", dataFileReader.getSchema());
+ }
+
+ end = split.getStart() + split.getLength();
+ recordsReadSinceLastSync = 0;
+ return dataFileReader;
+ }
+
+ @Override
+ public boolean reachedEnd() throws IOException {
+ return !dataFileReader.hasNext() || dataFileReader.pastSync(end);
+ }
+
+ public long getRecordsReadFromBlock() {
+ return this.recordsReadSinceLastSync;
+ }
+
+ @Override
+ public E nextRecord(E reuseValue) throws IOException {
+ if (reachedEnd()) {
+ return null;
+ }
+
+ // if we start a new block, then register the event, and
+ // restart the counter.
+ if (dataFileReader.previousSync() != lastSync) {
+ lastSync = dataFileReader.previousSync();
+ recordsReadSinceLastSync = 0;
+ }
+ recordsReadSinceLastSync++;
+
+ if (reuseAvroValue) {
+ return dataFileReader.next(reuseValue);
+ } else {
+ if (GenericRecord.class == avroValueType) {
+ return dataFileReader.next();
+ } else {
+ return dataFileReader.next(InstantiationUtil.instantiate(avroValueType, Object.class));
+ }
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Checkpointing
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public Tuple2<Long, Long> getCurrentState() throws IOException {
+ return new Tuple2<>(this.lastSync, this.recordsReadSinceLastSync);
+ }
+
+ @Override
+ public void reopen(FileInputSplit split, Tuple2<Long, Long> state) throws IOException {
+ Preconditions.checkNotNull(split, "reopen() cannot be called on a null split.");
+ Preconditions.checkNotNull(state, "reopen() cannot be called with a null initial state.");
+
+ try {
+ this.open(split);
+ } finally {
+ if (state.f0 != -1) {
+ lastSync = state.f0;
+ recordsReadSinceLastSync = state.f1;
+ }
+ }
+
+ if (lastSync != -1) {
+ // open and read until the record we were before
+ // the checkpoint and discard the values
+ dataFileReader.seek(lastSync);
+ for (int i = 0; i < recordsReadSinceLastSync; i++) {
+ dataFileReader.next(null);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroOutputFormat.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroOutputFormat.java
new file mode 100644
index 0000000..c0b8073
--- /dev/null
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroOutputFormat.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.api.common.io.FileOutputFormat;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.core.fs.Path;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.CodecFactory;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.reflect.ReflectDatumWriter;
+import org.apache.avro.specific.SpecificDatumWriter;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link FileOutputFormat} for Avro records.
+ * @param <E>
+ */
+public class AvroOutputFormat<E> extends FileOutputFormat<E> implements Serializable {
+
+ /**
+ * Wrapper which encapsulates the supported codec and a related serialization byte.
+ */
+ public enum Codec {
+
+ NULL((byte) 0, CodecFactory.nullCodec()),
+ SNAPPY((byte) 1, CodecFactory.snappyCodec()),
+ BZIP2((byte) 2, CodecFactory.bzip2Codec()),
+ DEFLATE((byte) 3, CodecFactory.deflateCodec(CodecFactory.DEFAULT_DEFLATE_LEVEL)),
+ XZ((byte) 4, CodecFactory.xzCodec(CodecFactory.DEFAULT_XZ_LEVEL));
+
+ private byte codecByte;
+
+ private CodecFactory codecFactory;
+
+ Codec(final byte codecByte, final CodecFactory codecFactory) {
+ this.codecByte = codecByte;
+ this.codecFactory = codecFactory;
+ }
+
+ private byte getCodecByte() {
+ return codecByte;
+ }
+
+ private CodecFactory getCodecFactory() {
+ return codecFactory;
+ }
+
+ private static Codec forCodecByte(byte codecByte) {
+ for (final Codec codec : Codec.values()) {
+ if (codec.getCodecByte() == codecByte) {
+ return codec;
+ }
+ }
+ throw new IllegalArgumentException("no codec for codecByte: " + codecByte);
+ }
+ }
+
+ private static final long serialVersionUID = 1L;
+
+ private final Class<E> avroValueType;
+
+ private transient Schema userDefinedSchema = null;
+
+ private transient Codec codec = null;
+
+ private transient DataFileWriter<E> dataFileWriter;
+
+ public AvroOutputFormat(Path filePath, Class<E> type) {
+ super(filePath);
+ this.avroValueType = type;
+ }
+
+ public AvroOutputFormat(Class<E> type) {
+ this.avroValueType = type;
+ }
+
+ @Override
+ protected String getDirectoryFileName(int taskNumber) {
+ return super.getDirectoryFileName(taskNumber) + ".avro";
+ }
+
+ public void setSchema(Schema schema) {
+ this.userDefinedSchema = schema;
+ }
+
+ /**
+ * Set avro codec for compression.
+ *
+ * @param codec avro codec.
+ */
+ public void setCodec(final Codec codec) {
+ this.codec = checkNotNull(codec, "codec can not be null");
+ }
+
+ @Override
+ public void writeRecord(E record) throws IOException {
+ dataFileWriter.append(record);
+ }
+
+ @Override
+ public void open(int taskNumber, int numTasks) throws IOException {
+ super.open(taskNumber, numTasks);
+
+ DatumWriter<E> datumWriter;
+ Schema schema;
+ if (org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroValueType)) {
+ datumWriter = new SpecificDatumWriter<E>(avroValueType);
+ try {
+ schema = ((org.apache.avro.specific.SpecificRecordBase) avroValueType.newInstance()).getSchema();
+ } catch (InstantiationException | IllegalAccessException e) {
+ throw new RuntimeException(e.getMessage());
+ }
+ } else if (org.apache.avro.generic.GenericRecord.class.isAssignableFrom(avroValueType)) {
+ if (userDefinedSchema == null) {
+ throw new IllegalStateException("Schema must be set when using Generic Record");
+ }
+ datumWriter = new GenericDatumWriter<E>(userDefinedSchema);
+ schema = userDefinedSchema;
+ } else {
+ datumWriter = new ReflectDatumWriter<E>(avroValueType);
+ schema = ReflectData.get().getSchema(avroValueType);
+ }
+ dataFileWriter = new DataFileWriter<E>(datumWriter);
+ if (codec != null) {
+ dataFileWriter.setCodec(codec.getCodecFactory());
+ }
+ if (userDefinedSchema == null) {
+ dataFileWriter.create(schema, stream);
+ } else {
+ dataFileWriter.create(userDefinedSchema, stream);
+ }
+ }
+
+ private void writeObject(java.io.ObjectOutputStream out) throws IOException {
+ out.defaultWriteObject();
+
+ if (codec != null) {
+ out.writeByte(codec.getCodecByte());
+ } else {
+ out.writeByte(-1);
+ }
+
+ if (userDefinedSchema != null) {
+ byte[] json = userDefinedSchema.toString().getBytes(ConfigConstants.DEFAULT_CHARSET);
+ out.writeInt(json.length);
+ out.write(json);
+ } else {
+ out.writeInt(0);
+ }
+ }
+
+ private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException {
+ in.defaultReadObject();
+
+ byte codecByte = in.readByte();
+ if (codecByte >= 0) {
+ setCodec(Codec.forCodecByte(codecByte));
+ }
+
+ int length = in.readInt();
+ if (length != 0) {
+ byte[] json = new byte[length];
+ in.readFully(json);
+
+ Schema schema = new Schema.Parser().parse(new String(json, ConfigConstants.DEFAULT_CHARSET));
+ setSchema(schema);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ dataFileWriter.flush();
+ dataFileWriter.close();
+ super.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
new file mode 100644
index 0000000..4a3c02e
--- /dev/null
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.avro.util.Utf8;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.List;
+
+/**
+ * Deserialization schema from Avro bytes over {@link SpecificRecord} to {@link Row}.
+ *
+ * <p>Deserializes the <code>byte[]</code> messages into (nested) Flink Rows.
+ *
+ * {@link Utf8} is converted to regular Java Strings.
+ */
+public class AvroRowDeserializationSchema extends AbstractDeserializationSchema<Row> {
+
+ /**
+ * Avro record class.
+ */
+ private Class<? extends SpecificRecord> recordClazz;
+
+ /**
+ * Schema for deterministic field order.
+ */
+ private transient Schema schema;
+
+ /**
+ * Reader that deserializes byte array into a record.
+ */
+ private transient DatumReader<SpecificRecord> datumReader;
+
+ /**
+ * Input stream to read message from.
+ */
+ private transient MutableByteArrayInputStream inputStream;
+
+ /**
+ * Avro decoder that decodes binary data.
+ */
+ private transient Decoder decoder;
+
+ /**
+ * Record to deserialize byte array to.
+ */
+ private SpecificRecord record;
+
+ /**
+ * Creates a Avro deserialization schema for the given record.
+ *
+ * @param recordClazz Avro record class used to deserialize Avro's record to Flink's row
+ */
+ public AvroRowDeserializationSchema(Class<? extends SpecificRecord> recordClazz) {
+ Preconditions.checkNotNull(recordClazz, "Avro record class must not be null.");
+ this.recordClazz = recordClazz;
+ this.schema = SpecificData.get().getSchema(recordClazz);
+ this.datumReader = new SpecificDatumReader<>(schema);
+ this.record = (SpecificRecord) SpecificData.newInstance(recordClazz, schema);
+ this.inputStream = new MutableByteArrayInputStream();
+ this.decoder = DecoderFactory.get().binaryDecoder(inputStream, null);
+ }
+
+ @Override
+ public Row deserialize(byte[] message) throws IOException {
+ // read record
+ try {
+ inputStream.setBuffer(message);
+ this.record = datumReader.read(record, decoder);
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to deserialize Row.", e);
+ }
+
+ // convert to row
+ final Object row = convertToRow(schema, record);
+ return (Row) row;
+ }
+
+ private void writeObject(ObjectOutputStream oos) throws IOException {
+ oos.writeObject(recordClazz);
+ }
+
+ @SuppressWarnings("unchecked")
+ private void readObject(ObjectInputStream ois) throws ClassNotFoundException, IOException {
+ this.recordClazz = (Class<? extends SpecificRecord>) ois.readObject();
+ this.schema = SpecificData.get().getSchema(recordClazz);
+ this.datumReader = new SpecificDatumReader<>(schema);
+ this.record = (SpecificRecord) SpecificData.newInstance(recordClazz, schema);
+ this.inputStream = new MutableByteArrayInputStream();
+ this.decoder = DecoderFactory.get().binaryDecoder(inputStream, null);
+ }
+
+ /**
+ * Converts a (nested) Avro {@link SpecificRecord} into Flink's Row type.
+ * Avro's {@link Utf8} fields are converted into regular Java strings.
+ */
+ private static Object convertToRow(Schema schema, Object recordObj) {
+ if (recordObj instanceof GenericRecord) {
+ // records can be wrapped in a union
+ if (schema.getType() == Schema.Type.UNION) {
+ final List<Schema> types = schema.getTypes();
+ if (types.size() == 2 && types.get(0).getType() == Schema.Type.NULL && types.get(1).getType() == Schema.Type.RECORD) {
+ schema = types.get(1);
+ }
+ else {
+ throw new RuntimeException("Currently we only support schemas of the following form: UNION[null, RECORD]. Given: " + schema);
+ }
+ } else if (schema.getType() != Schema.Type.RECORD) {
+ throw new RuntimeException("Record type for row type expected. But is: " + schema);
+ }
+ final List<Schema.Field> fields = schema.getFields();
+ final Row row = new Row(fields.size());
+ final GenericRecord record = (GenericRecord) recordObj;
+ for (int i = 0; i < fields.size(); i++) {
+ final Schema.Field field = fields.get(i);
+ row.setField(i, convertToRow(field.schema(), record.get(field.pos())));
+ }
+ return row;
+ } else if (recordObj instanceof Utf8) {
+ return recordObj.toString();
+ } else {
+ return recordObj;
+ }
+ }
+
+ /**
+ * An extension of the ByteArrayInputStream that allows to change a buffer that should be
+ * read without creating a new ByteArrayInputStream instance. This allows to re-use the same
+ * InputStream instance, copying message to process, and creation of Decoder on every new message.
+ */
+ private static final class MutableByteArrayInputStream extends ByteArrayInputStream {
+
+ public MutableByteArrayInputStream() {
+ super(new byte[0]);
+ }
+
+ /**
+ * Set buffer that can be read via the InputStream interface and reset the input stream.
+ * This has the same effect as creating a new ByteArrayInputStream with a new buffer.
+ *
+ * @param buf the new buffer to read.
+ */
+ public void setBuffer(byte[] buf) {
+ this.buf = buf;
+ this.pos = 0;
+ this.count = buf.length;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowSerializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowSerializationSchema.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowSerializationSchema.java
new file mode 100644
index 0000000..41000a6
--- /dev/null
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowSerializationSchema.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.avro.util.Utf8;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.List;
+
+/**
+ * Serialization schema that serializes {@link Row} over {@link SpecificRecord} into a Avro bytes.
+ */
+public class AvroRowSerializationSchema implements SerializationSchema<Row> {
+
+ /**
+ * Avro record class.
+ */
+ private Class<? extends SpecificRecord> recordClazz;
+
+ /**
+ * Avro serialization schema.
+ */
+ private transient Schema schema;
+
+ /**
+ * Writer to serialize Avro record into a byte array.
+ */
+ private transient DatumWriter<GenericRecord> datumWriter;
+
+ /**
+ * Output stream to serialize records into byte array.
+ */
+ private transient ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream();
+
+ /**
+ * Low-level class for serialization of Avro values.
+ */
+ private transient Encoder encoder = EncoderFactory.get().binaryEncoder(arrayOutputStream, null);
+
+ /**
+ * Creates a Avro serialization schema for the given schema.
+ *
+ * @param recordClazz Avro record class used to deserialize Avro's record to Flink's row
+ */
+ public AvroRowSerializationSchema(Class<? extends SpecificRecord> recordClazz) {
+ Preconditions.checkNotNull(recordClazz, "Avro record class must not be null.");
+ this.recordClazz = recordClazz;
+ this.schema = SpecificData.get().getSchema(recordClazz);
+ this.datumWriter = new SpecificDatumWriter<>(schema);
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public byte[] serialize(Row row) {
+ // convert to record
+ final Object record = convertToRecord(schema, row);
+
+ // write
+ try {
+ arrayOutputStream.reset();
+ datumWriter.write((GenericRecord) record, encoder);
+ encoder.flush();
+ return arrayOutputStream.toByteArray();
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to serialize Row.", e);
+ }
+ }
+
+ private void writeObject(ObjectOutputStream oos) throws IOException {
+ oos.writeObject(recordClazz);
+ }
+
+ @SuppressWarnings("unchecked")
+ private void readObject(ObjectInputStream ois) throws ClassNotFoundException, IOException {
+ this.recordClazz = (Class<? extends SpecificRecord>) ois.readObject();
+ this.schema = SpecificData.get().getSchema(recordClazz);
+ this.datumWriter = new SpecificDatumWriter<>(schema);
+ this.arrayOutputStream = new ByteArrayOutputStream();
+ this.encoder = EncoderFactory.get().binaryEncoder(arrayOutputStream, null);
+ }
+
+ /**
+ * Converts a (nested) Flink Row into Avro's {@link GenericRecord}.
+ * Strings are converted into Avro's {@link Utf8} fields.
+ */
+ private static Object convertToRecord(Schema schema, Object rowObj) {
+ if (rowObj instanceof Row) {
+ // records can be wrapped in a union
+ if (schema.getType() == Schema.Type.UNION) {
+ final List<Schema> types = schema.getTypes();
+ if (types.size() == 2 && types.get(0).getType() == Schema.Type.NULL && types.get(1).getType() == Schema.Type.RECORD) {
+ schema = types.get(1);
+ }
+ else if (types.size() == 2 && types.get(0).getType() == Schema.Type.RECORD && types.get(1).getType() == Schema.Type.NULL) {
+ schema = types.get(0);
+ }
+ else {
+ throw new RuntimeException("Currently we only support schemas of the following form: UNION[null, RECORD] or UNION[RECORD, NULL] Given: " + schema);
+ }
+ } else if (schema.getType() != Schema.Type.RECORD) {
+ throw new RuntimeException("Record type for row type expected. But is: " + schema);
+ }
+ final List<Schema.Field> fields = schema.getFields();
+ final GenericRecord record = new GenericData.Record(schema);
+ final Row row = (Row) rowObj;
+ for (int i = 0; i < fields.size(); i++) {
+ final Schema.Field field = fields.get(i);
+ record.put(field.pos(), convertToRecord(field.schema(), row.getField(i)));
+ }
+ return record;
+ } else if (rowObj instanceof String) {
+ return new Utf8((String) rowObj);
+ } else {
+ return rowObj;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
new file mode 100644
index 0000000..02f74f5
--- /dev/null
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.typeutils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.java.typeutils.runtime.KryoRegistration;
+import org.apache.flink.api.java.typeutils.runtime.KryoRegistrationSerializerConfigSnapshot;
+import org.apache.flink.api.java.typeutils.runtime.KryoUtils;
+import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.formats.avro.utils.DataInputDecoder;
+import org.apache.flink.formats.avro.utils.DataOutputEncoder;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+
+import com.esotericsoftware.kryo.Kryo;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.reflect.ReflectDatumWriter;
+import org.apache.avro.util.Utf8;
+import org.objenesis.strategy.StdInstantiatorStrategy;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * General purpose serialization. Currently using Apache Avro's Reflect-serializers for serialization and
+ * Kryo for deep object copies. We want to change this to Kryo-only.
+ *
+ * @param <T> The type serialized.
+ */
+@Internal
+public final class AvroSerializer<T> extends TypeSerializer<T> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final Class<T> type;
+
+ private final Class<? extends T> typeToInstantiate;
+
+ /**
+ * Map of class tag (using classname as tag) to their Kryo registration.
+ *
+ * <p>This map serves as a preview of the final registration result of
+ * the Kryo instance, taking into account registration overwrites.
+ */
+ private LinkedHashMap<String, KryoRegistration> kryoRegistrations;
+
+ private transient ReflectDatumWriter<T> writer;
+ private transient ReflectDatumReader<T> reader;
+
+ private transient DataOutputEncoder encoder;
+ private transient DataInputDecoder decoder;
+
+ private transient Kryo kryo;
+
+ private transient T deepCopyInstance;
+
+ // --------------------------------------------------------------------------------------------
+
+ public AvroSerializer(Class<T> type) {
+ this(type, type);
+ }
+
+ public AvroSerializer(Class<T> type, Class<? extends T> typeToInstantiate) {
+ this.type = checkNotNull(type);
+ this.typeToInstantiate = checkNotNull(typeToInstantiate);
+
+ InstantiationUtil.checkForInstantiation(typeToInstantiate);
+
+ this.kryoRegistrations = buildKryoRegistrations(type);
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public boolean isImmutableType() {
+ return false;
+ }
+
+ @Override
+ public AvroSerializer<T> duplicate() {
+ return new AvroSerializer<T>(type, typeToInstantiate);
+ }
+
+ @Override
+ public T createInstance() {
+ return InstantiationUtil.instantiate(this.typeToInstantiate);
+ }
+
+ @Override
+ public T copy(T from) {
+ checkKryoInitialized();
+
+ return KryoUtils.copy(from, kryo, this);
+ }
+
+ @Override
+ public T copy(T from, T reuse) {
+ checkKryoInitialized();
+
+ return KryoUtils.copy(from, reuse, kryo, this);
+ }
+
+ @Override
+ public int getLength() {
+ return -1;
+ }
+
+ @Override
+ public void serialize(T value, DataOutputView target) throws IOException {
+ checkAvroInitialized();
+ this.encoder.setOut(target);
+ this.writer.write(value, this.encoder);
+ }
+
+ @Override
+ public T deserialize(DataInputView source) throws IOException {
+ checkAvroInitialized();
+ this.decoder.setIn(source);
+ return this.reader.read(null, this.decoder);
+ }
+
+ @Override
+ public T deserialize(T reuse, DataInputView source) throws IOException {
+ checkAvroInitialized();
+ this.decoder.setIn(source);
+ return this.reader.read(reuse, this.decoder);
+ }
+
+ @Override
+ public void copy(DataInputView source, DataOutputView target) throws IOException {
+ checkAvroInitialized();
+
+ if (this.deepCopyInstance == null) {
+ this.deepCopyInstance = InstantiationUtil.instantiate(type, Object.class);
+ }
+
+ this.decoder.setIn(source);
+ this.encoder.setOut(target);
+
+ T tmp = this.reader.read(this.deepCopyInstance, this.decoder);
+ this.writer.write(tmp, this.encoder);
+ }
+
+ private void checkAvroInitialized() {
+ if (this.reader == null) {
+ this.reader = new ReflectDatumReader<T>(type);
+ this.writer = new ReflectDatumWriter<T>(type);
+ this.encoder = new DataOutputEncoder();
+ this.decoder = new DataInputDecoder();
+ }
+ }
+
+ private void checkKryoInitialized() {
+ if (this.kryo == null) {
+ this.kryo = new Kryo();
+
+ Kryo.DefaultInstantiatorStrategy instantiatorStrategy = new Kryo.DefaultInstantiatorStrategy();
+ instantiatorStrategy.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy());
+ kryo.setInstantiatorStrategy(instantiatorStrategy);
+
+ kryo.setAsmEnabled(true);
+
+ KryoUtils.applyRegistrations(kryo, kryoRegistrations.values());
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public int hashCode() {
+ return 31 * this.type.hashCode() + this.typeToInstantiate.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof AvroSerializer) {
+ @SuppressWarnings("unchecked")
+ AvroSerializer<T> avroSerializer = (AvroSerializer<T>) obj;
+
+ return avroSerializer.canEqual(this) &&
+ type == avroSerializer.type &&
+ typeToInstantiate == avroSerializer.typeToInstantiate;
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public boolean canEqual(Object obj) {
+ return obj instanceof AvroSerializer;
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Serializer configuration snapshotting & compatibility
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public AvroSerializerConfigSnapshot<T> snapshotConfiguration() {
+ return new AvroSerializerConfigSnapshot<>(type, typeToInstantiate, kryoRegistrations);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+ if (configSnapshot instanceof AvroSerializerConfigSnapshot) {
+ final AvroSerializerConfigSnapshot<T> config = (AvroSerializerConfigSnapshot<T>) configSnapshot;
+
+ if (type.equals(config.getTypeClass()) && typeToInstantiate.equals(config.getTypeToInstantiate())) {
+ // resolve Kryo registrations; currently, since the Kryo registrations in Avro
+ // are fixed, there shouldn't be a problem with the resolution here.
+
+ LinkedHashMap<String, KryoRegistration> oldRegistrations = config.getKryoRegistrations();
+ oldRegistrations.putAll(kryoRegistrations);
+
+ for (Map.Entry<String, KryoRegistration> reconfiguredRegistrationEntry : kryoRegistrations.entrySet()) {
+ if (reconfiguredRegistrationEntry.getValue().isDummy()) {
+ return CompatibilityResult.requiresMigration();
+ }
+ }
+
+ this.kryoRegistrations = oldRegistrations;
+ return CompatibilityResult.compatible();
+ }
+ }
+
+ // ends up here if the preceding serializer is not
+ // the ValueSerializer, or serialized data type has changed
+ return CompatibilityResult.requiresMigration();
+ }
+
+ /**
+ * {@link TypeSerializerConfigSnapshot} for Avro.
+ */
+ public static class AvroSerializerConfigSnapshot<T> extends KryoRegistrationSerializerConfigSnapshot<T> {
+
+ private static final int VERSION = 1;
+
+ private Class<? extends T> typeToInstantiate;
+
+ public AvroSerializerConfigSnapshot() {}
+
+ public AvroSerializerConfigSnapshot(
+ Class<T> baseType,
+ Class<? extends T> typeToInstantiate,
+ LinkedHashMap<String, KryoRegistration> kryoRegistrations) {
+
+ super(baseType, kryoRegistrations);
+ this.typeToInstantiate = Preconditions.checkNotNull(typeToInstantiate);
+ }
+
+ @Override
+ public void write(DataOutputView out) throws IOException {
+ super.write(out);
+
+ out.writeUTF(typeToInstantiate.getName());
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void read(DataInputView in) throws IOException {
+ super.read(in);
+
+ String classname = in.readUTF();
+ try {
+ typeToInstantiate = (Class<? extends T>) Class.forName(classname, true, getUserCodeClassLoader());
+ } catch (ClassNotFoundException e) {
+ throw new IOException("Cannot find requested class " + classname + " in classpath.", e);
+ }
+ }
+
+ @Override
+ public int getVersion() {
+ return VERSION;
+ }
+
+ public Class<? extends T> getTypeToInstantiate() {
+ return typeToInstantiate;
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+ in.defaultReadObject();
+
+ // kryoRegistrations may be null if this Avro serializer is deserialized from an old version
+ if (kryoRegistrations == null) {
+ this.kryoRegistrations = buildKryoRegistrations(type);
+ }
+ }
+
+ private static <T> LinkedHashMap<String, KryoRegistration> buildKryoRegistrations(Class<T> serializedDataType) {
+ final LinkedHashMap<String, KryoRegistration> registrations = new LinkedHashMap<>();
+
+ // register Avro types.
+ registrations.put(
+ GenericData.Array.class.getName(),
+ new KryoRegistration(
+ GenericData.Array.class,
+ new ExecutionConfig.SerializableSerializer<>(new Serializers.SpecificInstanceCollectionSerializerForArrayList())));
+ registrations.put(Utf8.class.getName(), new KryoRegistration(Utf8.class));
+ registrations.put(GenericData.EnumSymbol.class.getName(), new KryoRegistration(GenericData.EnumSymbol.class));
+ registrations.put(GenericData.Fixed.class.getName(), new KryoRegistration(GenericData.Fixed.class));
+ registrations.put(GenericData.StringType.class.getName(), new KryoRegistration(GenericData.StringType.class));
+
+ // register the serialized data type
+ registrations.put(serializedDataType.getName(), new KryoRegistration(serializedDataType));
+
+ return registrations;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfo.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfo.java
new file mode 100644
index 0000000..ddc89a8
--- /dev/null
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfo.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.typeutils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.PojoField;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+
+import org.apache.avro.specific.SpecificRecordBase;
+
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Special type information to generate a special AvroTypeInfo for Avro POJOs (implementing SpecificRecordBase, the typed Avro POJOs)
+ *
+ * <p>Proceeding: It uses a regular pojo type analysis and replaces all {@code GenericType<CharSequence>} with a {@code GenericType<avro.Utf8>}.
+ * All other types used by Avro are standard Java types.
+ * Only strings are represented as CharSequence fields and represented as Utf8 classes at runtime.
+ * CharSequence is not comparable. To make them nicely usable with field expressions, we replace them here
+ * by generic type infos containing Utf8 classes (which are comparable),
+ *
+ * <p>This class is checked by the AvroPojoTest.
+ */
+public class AvroTypeInfo<T extends SpecificRecordBase> extends PojoTypeInfo<T> {
+
+ public AvroTypeInfo(Class<T> typeClass) {
+ super(typeClass, generateFieldsFromAvroSchema(typeClass));
+ }
+
+ @Override
+ public TypeSerializer<T> createSerializer(ExecutionConfig config) {
+ return super.createSerializer(config);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Internal
+ public static <T extends SpecificRecordBase> List<PojoField> generateFieldsFromAvroSchema(Class<T> typeClass) {
+ PojoTypeExtractor pte = new PojoTypeExtractor();
+ ArrayList<Type> typeHierarchy = new ArrayList<>();
+ typeHierarchy.add(typeClass);
+ TypeInformation ti = pte.analyzePojo(typeClass, typeHierarchy, null, null, null);
+
+ if (!(ti instanceof PojoTypeInfo)) {
+ throw new IllegalStateException("Expecting type to be a PojoTypeInfo");
+ }
+ PojoTypeInfo pti = (PojoTypeInfo) ti;
+ List<PojoField> newFields = new ArrayList<>(pti.getTotalFields());
+
+ for (int i = 0; i < pti.getArity(); i++) {
+ PojoField f = pti.getPojoFieldAt(i);
+ TypeInformation newType = f.getTypeInformation();
+ // check if type is a CharSequence
+ if (newType instanceof GenericTypeInfo) {
+ if ((newType).getTypeClass().equals(CharSequence.class)) {
+ // replace the type by a org.apache.avro.util.Utf8
+ newType = new GenericTypeInfo(org.apache.avro.util.Utf8.class);
+ }
+ }
+ PojoField newField = new PojoField(f.getField(), newType);
+ newFields.add(newField);
+ }
+ return newFields;
+ }
+
+ private static class PojoTypeExtractor extends TypeExtractor {
+ private PojoTypeExtractor() {
+ super();
+ }
+
+ @Override
+ public <OUT, IN1, IN2> TypeInformation<OUT> analyzePojo(Class<OUT> clazz, ArrayList<Type> typeHierarchy,
+ ParameterizedType parameterizedType, TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) {
+ return super.analyzePojo(clazz, typeHierarchy, parameterizedType, in1Type, in2Type);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils.java
new file mode 100644
index 0000000..7305f23
--- /dev/null
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.utils;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+
+import java.io.Serializable;
+
+/**
+ * Utilities for integrating Avro serializers in Kryo.
+ */
+public class AvroKryoSerializerUtils {
+
+ public static void addAvroSerializers(ExecutionConfig reg, Class<?> type) {
+ // Avro POJOs contain java.util.List which have GenericData.Array as their runtime type
+ // because Kryo is not able to serialize them properly, we use this serializer for them
+ reg.registerTypeWithKryoSerializer(GenericData.Array.class, Serializers.SpecificInstanceCollectionSerializerForArrayList.class);
+
+ // We register this serializer for users who want to use untyped Avro records (GenericData.Record).
+ // Kryo is able to serialize everything in there, except for the Schema.
+ // This serializer is very slow, but using the GenericData.Records of Kryo is in general a bad idea.
+ // we add the serializer as a default serializer because Avro is using a private sub-type at runtime.
+ reg.addDefaultKryoSerializer(Schema.class, AvroSchemaSerializer.class);
+ }
+
+ /**
+ * Slow serialization approach for Avro schemas.
+ * This is only used with {{@link org.apache.avro.generic.GenericData.Record}} types.
+ * Having this serializer, we are able to handle avro Records.
+ */
+ public static class AvroSchemaSerializer extends Serializer<Schema> implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void write(Kryo kryo, Output output, Schema object) {
+ String schemaAsString = object.toString(false);
+ output.writeString(schemaAsString);
+ }
+
+ @Override
+ public Schema read(Kryo kryo, Input input, Class<Schema> type) {
+ String schemaAsString = input.readString();
+ // the parser seems to be stateful, to we need a new one for every type.
+ Schema.Parser sParser = new Schema.Parser();
+ return sParser.parse(schemaAsString);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/DataInputDecoder.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/DataInputDecoder.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/DataInputDecoder.java
new file mode 100644
index 0000000..32032cc
--- /dev/null
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/DataInputDecoder.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.utils;
+
+import org.apache.avro.io.Decoder;
+import org.apache.avro.util.Utf8;
+
+import java.io.DataInput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ * A {@link Decoder} that reads from a {@link DataInput}.
+ */
+public class DataInputDecoder extends Decoder {
+
+ private final Utf8 stringDecoder = new Utf8();
+
+ private DataInput in;
+
+ public void setIn(DataInput in) {
+ this.in = in;
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // primitives
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public void readNull() {}
+
+ @Override
+ public boolean readBoolean() throws IOException {
+ return in.readBoolean();
+ }
+
+ @Override
+ public int readInt() throws IOException {
+ return in.readInt();
+ }
+
+ @Override
+ public long readLong() throws IOException {
+ return in.readLong();
+ }
+
+ @Override
+ public float readFloat() throws IOException {
+ return in.readFloat();
+ }
+
+ @Override
+ public double readDouble() throws IOException {
+ return in.readDouble();
+ }
+
+ @Override
+ public int readEnum() throws IOException {
+ return readInt();
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // bytes
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public void readFixed(byte[] bytes, int start, int length) throws IOException {
+ in.readFully(bytes, start, length);
+ }
+
+ @Override
+ public ByteBuffer readBytes(ByteBuffer old) throws IOException {
+ int length = readInt();
+ ByteBuffer result;
+ if (old != null && length <= old.capacity() && old.hasArray()) {
+ result = old;
+ result.clear();
+ } else {
+ result = ByteBuffer.allocate(length);
+ }
+ in.readFully(result.array(), result.arrayOffset() + result.position(), length);
+ result.limit(length);
+ return result;
+ }
+
+ @Override
+ public void skipFixed(int length) throws IOException {
+ skipBytes(length);
+ }
+
+ @Override
+ public void skipBytes() throws IOException {
+ int num = readInt();
+ skipBytes(num);
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // strings
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public Utf8 readString(Utf8 old) throws IOException {
+ int length = readInt();
+ Utf8 result = (old != null ? old : new Utf8());
+ result.setByteLength(length);
+
+ if (length > 0) {
+ in.readFully(result.getBytes(), 0, length);
+ }
+
+ return result;
+ }
+
+ @Override
+ public String readString() throws IOException {
+ return readString(stringDecoder).toString();
+ }
+
+ @Override
+ public void skipString() throws IOException {
+ int len = readInt();
+ skipBytes(len);
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // collection types
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public long readArrayStart() throws IOException {
+ return readVarLongCount(in);
+ }
+
+ @Override
+ public long arrayNext() throws IOException {
+ return readVarLongCount(in);
+ }
+
+ @Override
+ public long skipArray() throws IOException {
+ return readVarLongCount(in);
+ }
+
+ @Override
+ public long readMapStart() throws IOException {
+ return readVarLongCount(in);
+ }
+
+ @Override
+ public long mapNext() throws IOException {
+ return readVarLongCount(in);
+ }
+
+ @Override
+ public long skipMap() throws IOException {
+ return readVarLongCount(in);
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // union
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public int readIndex() throws IOException {
+ return readInt();
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // utils
+ // --------------------------------------------------------------------------------------------
+
+ private void skipBytes(int num) throws IOException {
+ while (num > 0) {
+ num -= in.skipBytes(num);
+ }
+ }
+
+ public static long readVarLongCount(DataInput in) throws IOException {
+ long value = in.readUnsignedByte();
+
+ if ((value & 0x80) == 0) {
+ return value;
+ }
+ else {
+ long curr;
+ int shift = 7;
+ value = value & 0x7f;
+ while (((curr = in.readUnsignedByte()) & 0x80) != 0){
+ value |= (curr & 0x7f) << shift;
+ shift += 7;
+ }
+ value |= curr << shift;
+ return value;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/DataOutputEncoder.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/DataOutputEncoder.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/DataOutputEncoder.java
new file mode 100644
index 0000000..c2d490b
--- /dev/null
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/DataOutputEncoder.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.utils;
+
+import org.apache.avro.io.Encoder;
+import org.apache.avro.util.Utf8;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ * An {@link Encoder} that writes data to a {@link DataOutput}.
+ */
+public final class DataOutputEncoder extends Encoder implements java.io.Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private DataOutput out;
+
+ public void setOut(DataOutput out) {
+ this.out = out;
+ }
+
+ @Override
+ public void flush() throws IOException {}
+
+ // --------------------------------------------------------------------------------------------
+ // primitives
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public void writeNull() {}
+
+ @Override
+ public void writeBoolean(boolean b) throws IOException {
+ out.writeBoolean(b);
+ }
+
+ @Override
+ public void writeInt(int n) throws IOException {
+ out.writeInt(n);
+ }
+
+ @Override
+ public void writeLong(long n) throws IOException {
+ out.writeLong(n);
+ }
+
+ @Override
+ public void writeFloat(float f) throws IOException {
+ out.writeFloat(f);
+ }
+
+ @Override
+ public void writeDouble(double d) throws IOException {
+ out.writeDouble(d);
+ }
+
+ @Override
+ public void writeEnum(int e) throws IOException {
+ out.writeInt(e);
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // bytes
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public void writeFixed(byte[] bytes, int start, int len) throws IOException {
+ out.write(bytes, start, len);
+ }
+
+ @Override
+ public void writeBytes(byte[] bytes, int start, int len) throws IOException {
+ out.writeInt(len);
+ if (len > 0) {
+ out.write(bytes, start, len);
+ }
+ }
+
+ @Override
+ public void writeBytes(ByteBuffer bytes) throws IOException {
+ int num = bytes.remaining();
+ out.writeInt(num);
+
+ if (num > 0) {
+ writeFixed(bytes);
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // strings
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public void writeString(String str) throws IOException {
+ byte[] bytes = Utf8.getBytesFor(str);
+ writeBytes(bytes, 0, bytes.length);
+ }
+
+ @Override
+ public void writeString(Utf8 utf8) throws IOException {
+ writeBytes(utf8.getBytes(), 0, utf8.getByteLength());
+
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // collection types
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public void writeArrayStart() {}
+
+ @Override
+ public void setItemCount(long itemCount) throws IOException {
+ if (itemCount > 0) {
+ writeVarLongCount(out, itemCount);
+ }
+ }
+
+ @Override
+ public void startItem() {}
+
+ @Override
+ public void writeArrayEnd() throws IOException {
+ // write a single byte 0, shortcut for a var-length long of 0
+ out.write(0);
+ }
+
+ @Override
+ public void writeMapStart() {}
+
+ @Override
+ public void writeMapEnd() throws IOException {
+ // write a single byte 0, shortcut for a var-length long of 0
+ out.write(0);
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // union
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public void writeIndex(int unionIndex) throws IOException {
+ out.writeInt(unionIndex);
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // utils
+ // --------------------------------------------------------------------------------------------
+
+ public static void writeVarLongCount(DataOutput out, long val) throws IOException {
+ if (val < 0) {
+ throw new IOException("Illegal count (must be non-negative): " + val);
+ }
+
+ while ((val & ~0x7FL) != 0) {
+ out.write(((int) val) | 0x80);
+ val >>>= 7;
+ }
+ out.write((int) val);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/FSDataInputStreamWrapper.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/FSDataInputStreamWrapper.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/FSDataInputStreamWrapper.java
new file mode 100644
index 0000000..c00fecb
--- /dev/null
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/FSDataInputStreamWrapper.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.utils;
+
+import org.apache.flink.core.fs.FSDataInputStream;
+
+import org.apache.avro.file.SeekableInput;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * Code copy pasted from org.apache.avro.mapred.FSInput (which is Apache licensed as well).
+ *
+ * <p>The wrapper keeps track of the position in the data stream.
+ */
+public class FSDataInputStreamWrapper implements Closeable, SeekableInput {
+ private final FSDataInputStream stream;
+ private long pos;
+ private long len;
+
+ public FSDataInputStreamWrapper(FSDataInputStream stream, long len) {
+ this.stream = stream;
+ this.pos = 0;
+ this.len = len;
+ }
+
+ public long length() throws IOException {
+ return this.len;
+ }
+
+ public int read(byte[] b, int off, int len) throws IOException {
+ int read;
+ read = stream.read(b, off, len);
+ pos += read;
+ return read;
+ }
+
+ public void seek(long p) throws IOException {
+ stream.seek(p);
+ pos = p;
+ }
+
+ public long tell() throws IOException {
+ return pos;
+ }
+
+ public void close() throws IOException {
+ stream.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-formats/flink-avro/src/test/assembly/test-assembly.xml
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/test/assembly/test-assembly.xml b/flink-formats/flink-avro/src/test/assembly/test-assembly.xml
new file mode 100644
index 0000000..8361693
--- /dev/null
+++ b/flink-formats/flink-avro/src/test/assembly/test-assembly.xml
@@ -0,0 +1,36 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<assembly>
+ <id>test-jar</id>
+ <formats>
+ <format>jar</format>
+ </formats>
+ <includeBaseDirectory>false</includeBaseDirectory>
+ <fileSets>
+ <fileSet>
+ <directory>${project.build.testOutputDirectory}</directory>
+ <outputDirectory></outputDirectory>
+ <!--modify/add include to match your package(s) -->
+ <includes>
+ <include>org/apache/flink/formats/avro/testjar/**</include>
+ </includes>
+ </fileSet>
+ </fileSets>
+</assembly>
http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroExternalJarProgramITCase.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroExternalJarProgramITCase.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroExternalJarProgramITCase.java
new file mode 100644
index 0000000..985471a
--- /dev/null
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroExternalJarProgramITCase.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.avro.testjar.AvroExternalJarProgram;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.test.util.TestEnvironment;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.File;
+import java.net.URL;
+import java.util.Collections;
+
+/**
+ * IT case for the {@link AvroExternalJarProgram}.
+ */
+public class AvroExternalJarProgramITCase extends TestLogger {
+
+ private static final String JAR_FILE = "maven-test-jar.jar";
+
+ private static final String TEST_DATA_FILE = "/testdata.avro";
+
+ @Test
+ public void testExternalProgram() {
+
+ LocalFlinkMiniCluster testMiniCluster = null;
+
+ try {
+ int parallelism = 4;
+ Configuration config = new Configuration();
+ config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, parallelism);
+ testMiniCluster = new LocalFlinkMiniCluster(config, false);
+ testMiniCluster.start();
+
+ String jarFile = JAR_FILE;
+ String testData = getClass().getResource(TEST_DATA_FILE).toString();
+
+ PackagedProgram program = new PackagedProgram(new File(jarFile), new String[] { testData });
+
+ TestEnvironment.setAsContext(
+ testMiniCluster,
+ parallelism,
+ Collections.singleton(new Path(jarFile)),
+ Collections.<URL>emptyList());
+
+ config.setString(JobManagerOptions.ADDRESS, "localhost");
+ config.setInteger(JobManagerOptions.PORT, testMiniCluster.getLeaderRPCPort());
+
+ program.invokeInteractiveModeForExecution();
+ }
+ catch (Throwable t) {
+ System.err.println(t.getMessage());
+ t.printStackTrace();
+ Assert.fail("Error during the packaged program execution: " + t.getMessage());
+ }
+ finally {
+ TestEnvironment.unsetAsContext();
+
+ if (testMiniCluster != null) {
+ try {
+ testMiniCluster.stop();
+ } catch (Throwable t) {
+ // ignore
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroInputFormatTypeExtractionTest.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroInputFormatTypeExtractionTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroInputFormatTypeExtractionTest.java
new file mode 100644
index 0000000..bc4f253
--- /dev/null
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroInputFormatTypeExtractionTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.core.fs.Path;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Tests for the type extraction of the {@link AvroInputFormat}.
+ */
+public class AvroInputFormatTypeExtractionTest {
+
+ @Test
+ public void testTypeExtraction() {
+ try {
+ InputFormat<MyAvroType, ?> format = new AvroInputFormat<MyAvroType>(new Path("file:///ignore/this/file"), MyAvroType.class);
+
+ TypeInformation<?> typeInfoDirect = TypeExtractor.getInputFormatTypes(format);
+
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<MyAvroType> input = env.createInput(format);
+ TypeInformation<?> typeInfoDataSet = input.getType();
+
+ Assert.assertTrue(typeInfoDirect instanceof PojoTypeInfo);
+ Assert.assertTrue(typeInfoDataSet instanceof PojoTypeInfo);
+
+ Assert.assertEquals(MyAvroType.class, typeInfoDirect.getTypeClass());
+ Assert.assertEquals(MyAvroType.class, typeInfoDataSet.getTypeClass());
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ /**
+ * Test type.
+ */
+ public static final class MyAvroType {
+
+ public String theString;
+
+ public MyAvroType recursive;
+
+ private double aDouble;
+
+ public double getaDouble() {
+ return aDouble;
+ }
+
+ public void setaDouble(double aDouble) {
+ this.aDouble = aDouble;
+ }
+
+ public void setTheString(String theString) {
+ this.theString = theString;
+ }
+
+ public String getTheString() {
+ return theString;
+ }
+ }
+}