You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/07/12 14:48:49 UTC
[70/73] [abbrv] prefix all projects in addons and quickstarts with
flink-
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-avro/pom.xml
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/pom.xml b/flink-addons/flink-avro/pom.xml
new file mode 100644
index 0000000..dc06c09
--- /dev/null
+++ b/flink-addons/flink-avro/pom.xml
@@ -0,0 +1,158 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License. See accompanying LICENSE file.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <artifactId>flink-addons</artifactId>
+ <groupId>org.apache.flink</groupId>
+ <version>0.6-incubating-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>flink-avro</artifactId>
+ <name>flink-avro</name>
+
+ <packaging>jar</packaging>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-java</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-clients</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ <version>1.7.5</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro-mapred</artifactId>
+ <version>1.7.5</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-test-utils</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ <plugins>
+ <!-- Exclude ExternalJar contents from regular build -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <excludes>
+ <exclude>**/src/test/java/org/apache/flink/api/avro/testjar/*.java</exclude>
+ </excludes>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>create-test-dependency</id>
+ <phase>process-test-classes</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ <configuration>
+ <archive>
+ <manifest>
+ <mainClass>org.apache.flink.api.avro.testjar.AvroExternalJarProgram</mainClass>
+ </manifest>
+ </archive>
+ <finalName>maven</finalName>
+ <attach>false</attach>
+ <descriptors>
+ <descriptor>src/test/assembly/test-assembly.xml</descriptor>
+ </descriptors>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+
+ <pluginManagement>
+ <plugins>
+ <!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.-->
+ <plugin>
+ <groupId>org.eclipse.m2e</groupId>
+ <artifactId>lifecycle-mapping</artifactId>
+ <version>1.0.0</version>
+ <configuration>
+ <lifecycleMappingMetadata>
+ <pluginExecutions>
+ <pluginExecution>
+ <pluginExecutionFilter>
+ <groupId>
+ org.apache.maven.plugins
+ </groupId>
+ <artifactId>
+ maven-assembly-plugin
+ </artifactId>
+ <versionRange>
+ [2.4,)
+ </versionRange>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </pluginExecutionFilter>
+ <action>
+ <ignore></ignore>
+ </action>
+ </pluginExecution>
+ </pluginExecutions>
+ </lifecycleMappingMetadata>
+ </configuration>
+ </plugin>
+ </plugins>
+ </pluginManagement>
+ </build>
+
+ <profiles>
+ <profile>
+ <!-- A bug with java6 is causing the javadoc generation
+ to fail because the test case contains junit?
+ See https://github.com/stratosphere/stratosphere/pull/405#issuecomment-32591978
+ for further links -->
+ <id>disable-javadocs-in-java6</id>
+ <activation>
+ <jdk>(,1.6]</jdk> <!-- disable javadocs for java6 or lower. -->
+ </activation>
+ <properties>
+ <maven.javadoc.skip>true</maven.javadoc.skip>
+ </properties>
+ </profile>
+ </profiles>
+
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-avro/src/main/java/org/apache/flink/api/avro/AvroBaseValue.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/main/java/org/apache/flink/api/avro/AvroBaseValue.java b/flink-addons/flink-avro/src/main/java/org/apache/flink/api/avro/AvroBaseValue.java
new file mode 100644
index 0000000..8b58a98
--- /dev/null
+++ b/flink-addons/flink-avro/src/main/java/org/apache/flink/api/avro/AvroBaseValue.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.avro;
+
+import java.io.IOException;
+
+import org.apache.avro.mapred.AvroValue;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.reflect.ReflectDatumWriter;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.types.Key;
+import org.apache.flink.util.ReflectionUtil;
+
+
+public abstract class AvroBaseValue<T> extends AvroValue<T> implements Key<AvroBaseValue<T>> {
+
+ private static final long serialVersionUID = 1L;
+
+
+ public AvroBaseValue() {}
+
+ public AvroBaseValue(T datum) {
+ super(datum);
+ }
+
+
+ // --------------------------------------------------------------------------------------------
+ // Serialization / Deserialization
+ // --------------------------------------------------------------------------------------------
+
+ private ReflectDatumWriter<T> writer;
+ private ReflectDatumReader<T> reader;
+
+ private DataOutputEncoder encoder;
+ private DataInputDecoder decoder;
+
+
+ @Override
+ public void write(DataOutputView out) throws IOException {
+ // the null flag
+ if (datum() == null) {
+ out.writeBoolean(false);
+ } else {
+ out.writeBoolean(true);
+
+ DataOutputEncoder encoder = getEncoder();
+ encoder.setOut(out);
+ getWriter().write(datum(), encoder);
+ }
+ }
+
+ @Override
+ public void read(DataInputView in) throws IOException {
+ // the null flag
+ if (in.readBoolean()) {
+
+ DataInputDecoder decoder = getDecoder();
+ decoder.setIn(in);
+ datum(getReader().read(datum(), decoder));
+ }
+ }
+
+ private ReflectDatumWriter<T> getWriter() {
+ if (this.writer == null) {
+ @SuppressWarnings("unchecked")
+ Class<T> clazz = (Class<T>) datum().getClass();
+ this.writer = new ReflectDatumWriter<T>(clazz);
+ }
+ return this.writer;
+ }
+
+ private ReflectDatumReader<T> getReader() {
+ if (this.reader == null) {
+ Class<T> datumClass = ReflectionUtil.getTemplateType1(getClass());
+ this.reader = new ReflectDatumReader<T>(datumClass);
+ }
+ return this.reader;
+ }
+
+ private DataOutputEncoder getEncoder() {
+ if (this.encoder == null) {
+ this.encoder = new DataOutputEncoder();
+ }
+ return this.encoder;
+ }
+
+ private DataInputDecoder getDecoder() {
+ if (this.decoder == null) {
+ this.decoder = new DataInputDecoder();
+ }
+ return this.decoder;
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Hashing / Equality
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public int hashCode() {
+ return datum() == null ? 0 : datum().hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj.getClass() == this.getClass()) {
+ Object otherDatum = ((AvroBaseValue<?>) obj).datum();
+ Object thisDatum = datum();
+
+ if (thisDatum == null) {
+ return otherDatum == null;
+ } else {
+ return thisDatum.equals(otherDatum);
+ }
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "AvroBaseValue (" + datum() + ")";
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public int compareTo(AvroBaseValue<T> o) {
+ Object otherDatum = o.datum();
+ Object thisDatum = datum();
+
+ if (thisDatum == null) {
+ return otherDatum == null ? 0 : -1;
+ } else {
+ return otherDatum == null ? 1: ((Comparable<Object>) thisDatum).compareTo(otherDatum);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-avro/src/main/java/org/apache/flink/api/avro/DataInputDecoder.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/main/java/org/apache/flink/api/avro/DataInputDecoder.java b/flink-addons/flink-avro/src/main/java/org/apache/flink/api/avro/DataInputDecoder.java
new file mode 100644
index 0000000..3c9fd34
--- /dev/null
+++ b/flink-addons/flink-avro/src/main/java/org/apache/flink/api/avro/DataInputDecoder.java
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.avro;
+
+import java.io.DataInput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.avro.io.Decoder;
+import org.apache.avro.util.Utf8;
+
+
+public class DataInputDecoder extends Decoder {
+
+ private final Utf8 stringDecoder = new Utf8();
+
+ private DataInput in;
+
+ public void setIn(DataInput in) {
+ this.in = in;
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // primitives
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public void readNull() {}
+
+
+ @Override
+ public boolean readBoolean() throws IOException {
+ return in.readBoolean();
+ }
+
+ @Override
+ public int readInt() throws IOException {
+ return in.readInt();
+ }
+
+ @Override
+ public long readLong() throws IOException {
+ return in.readLong();
+ }
+
+ @Override
+ public float readFloat() throws IOException {
+ return in.readFloat();
+ }
+
+ @Override
+ public double readDouble() throws IOException {
+ return in.readDouble();
+ }
+
+ @Override
+ public int readEnum() throws IOException {
+ return readInt();
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // bytes
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public void readFixed(byte[] bytes, int start, int length) throws IOException {
+ in.readFully(bytes, start, length);
+ }
+
+ @Override
+ public ByteBuffer readBytes(ByteBuffer old) throws IOException {
+ int length = readInt();
+ ByteBuffer result;
+ if (old != null && length <= old.capacity() && old.hasArray()) {
+ result = old;
+ result.clear();
+ } else {
+ result = ByteBuffer.allocate(length);
+ }
+ in.readFully(result.array(), result.arrayOffset() + result.position(), length);
+ result.limit(length);
+ return result;
+ }
+
+
+ @Override
+ public void skipFixed(int length) throws IOException {
+ skipBytes(length);
+ }
+
+ @Override
+ public void skipBytes() throws IOException {
+ int num = readInt();
+ skipBytes(num);
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // strings
+ // --------------------------------------------------------------------------------------------
+
+
+ @Override
+ public Utf8 readString(Utf8 old) throws IOException {
+ int length = readInt();
+ Utf8 result = (old != null ? old : new Utf8());
+ result.setByteLength(length);
+
+ if (length > 0) {
+ in.readFully(result.getBytes(), 0, length);
+ }
+
+ return result;
+ }
+
+ @Override
+ public String readString() throws IOException {
+ return readString(stringDecoder).toString();
+ }
+
+ @Override
+ public void skipString() throws IOException {
+ int len = readInt();
+ skipBytes(len);
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // collection types
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public long readArrayStart() throws IOException {
+ return readVarLongCount(in);
+ }
+
+ @Override
+ public long arrayNext() throws IOException {
+ return readVarLongCount(in);
+ }
+
+ @Override
+ public long skipArray() throws IOException {
+ return readVarLongCount(in);
+ }
+
+ @Override
+ public long readMapStart() throws IOException {
+ return readVarLongCount(in);
+ }
+
+ @Override
+ public long mapNext() throws IOException {
+ return readVarLongCount(in);
+ }
+
+ @Override
+ public long skipMap() throws IOException {
+ return readVarLongCount(in);
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // union
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public int readIndex() throws IOException {
+ return readInt();
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // utils
+ // --------------------------------------------------------------------------------------------
+
+ private void skipBytes(int num) throws IOException {
+ while (num > 0) {
+ num -= in.skipBytes(num);
+ }
+ }
+
+ public static long readVarLongCount(DataInput in) throws IOException {
+ long value = in.readUnsignedByte();
+
+ if ((value & 0x80) == 0) {
+ return value;
+ }
+ else {
+ long curr;
+ int shift = 7;
+ value = value & 0x7f;
+ while (((curr = in.readUnsignedByte()) & 0x80) != 0){
+ value |= (curr & 0x7f) << shift;
+ shift += 7;
+ }
+ value |= curr << shift;
+ return value;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-avro/src/main/java/org/apache/flink/api/avro/DataOutputEncoder.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/main/java/org/apache/flink/api/avro/DataOutputEncoder.java b/flink-addons/flink-avro/src/main/java/org/apache/flink/api/avro/DataOutputEncoder.java
new file mode 100644
index 0000000..5463237
--- /dev/null
+++ b/flink-addons/flink-avro/src/main/java/org/apache/flink/api/avro/DataOutputEncoder.java
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.avro;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.avro.io.Encoder;
+import org.apache.avro.util.Utf8;
+
+
+public final class DataOutputEncoder extends Encoder implements java.io.Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private DataOutput out;
+
+
+ public void setOut(DataOutput out) {
+ this.out = out;
+ }
+
+
+ @Override
+ public void flush() throws IOException {}
+
+ // --------------------------------------------------------------------------------------------
+ // primitives
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public void writeNull() {}
+
+
+ @Override
+ public void writeBoolean(boolean b) throws IOException {
+ out.writeBoolean(b);
+ }
+
+ @Override
+ public void writeInt(int n) throws IOException {
+ out.writeInt(n);
+ }
+
+ @Override
+ public void writeLong(long n) throws IOException {
+ out.writeLong(n);
+ }
+
+ @Override
+ public void writeFloat(float f) throws IOException {
+ out.writeFloat(f);
+ }
+
+ @Override
+ public void writeDouble(double d) throws IOException {
+ out.writeDouble(d);
+ }
+
+ @Override
+ public void writeEnum(int e) throws IOException {
+ out.writeInt(e);
+ }
+
+
+ // --------------------------------------------------------------------------------------------
+ // bytes
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public void writeFixed(byte[] bytes, int start, int len) throws IOException {
+ out.write(bytes, start, len);
+ }
+
+ @Override
+ public void writeBytes(byte[] bytes, int start, int len) throws IOException {
+ out.writeInt(len);
+ if (len > 0) {
+ out.write(bytes, start, len);
+ }
+ }
+
+ @Override
+ public void writeBytes(ByteBuffer bytes) throws IOException {
+ int num = bytes.remaining();
+ out.writeInt(num);
+
+ if (num > 0) {
+ writeFixed(bytes);
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // strings
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public void writeString(String str) throws IOException {
+ byte[] bytes = Utf8.getBytesFor(str);
+ writeBytes(bytes, 0, bytes.length);
+ }
+
+ @Override
+ public void writeString(Utf8 utf8) throws IOException {
+ writeBytes(utf8.getBytes(), 0, utf8.getByteLength());
+
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // collection types
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public void writeArrayStart() {}
+
+ @Override
+ public void setItemCount(long itemCount) throws IOException {
+ if (itemCount > 0) {
+ writeVarLongCount(out, itemCount);
+ }
+ }
+
+ @Override
+ public void startItem() {}
+
+ @Override
+ public void writeArrayEnd() throws IOException {
+ // write a single byte 0, shortcut for a var-length long of 0
+ out.write(0);
+ }
+
+ @Override
+ public void writeMapStart() {}
+
+ @Override
+ public void writeMapEnd() throws IOException {
+ // write a single byte 0, shortcut for a var-length long of 0
+ out.write(0);
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // union
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public void writeIndex(int unionIndex) throws IOException {
+ out.writeInt(unionIndex);
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // utils
+ // --------------------------------------------------------------------------------------------
+
+
+ public static final void writeVarLongCount(DataOutput out, long val) throws IOException {
+ if (val < 0) {
+ throw new IOException("Illegal count (must be non-negative): " + val);
+ }
+
+ while ((val & ~0x7FL) != 0) {
+ out.write(((int) val) | 0x80);
+ val >>>= 7;
+ }
+ out.write((int) val);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-avro/src/main/java/org/apache/flink/api/avro/FSDataInputStreamWrapper.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/main/java/org/apache/flink/api/avro/FSDataInputStreamWrapper.java b/flink-addons/flink-avro/src/main/java/org/apache/flink/api/avro/FSDataInputStreamWrapper.java
new file mode 100644
index 0000000..cb4a739
--- /dev/null
+++ b/flink-addons/flink-avro/src/main/java/org/apache/flink/api/avro/FSDataInputStreamWrapper.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.avro;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.apache.avro.file.SeekableInput;
+import org.apache.flink.core.fs.FSDataInputStream;
+
+
+/**
+ * Code copy pasted from org.apache.avro.mapred.FSInput (which is Apache licensed as well)
+ *
+ * The wrapper keeps track of the position in the data stream.
+ */
+public class FSDataInputStreamWrapper implements Closeable, SeekableInput {
+ private final FSDataInputStream stream;
+ private final long len;
+ private long pos;
+
+ public FSDataInputStreamWrapper(FSDataInputStream stream, long len) {
+ this.stream = stream;
+ this.len = len;
+ this.pos = 0;
+ }
+
+ public long length() {
+ return len;
+ }
+
+ public int read(byte[] b, int off, int len) throws IOException {
+ int read;
+ read = stream.read(b, off, len);
+ pos += read;
+ return read;
+ }
+
+ public void seek(long p) throws IOException {
+ stream.seek(p);
+ pos = p;
+ }
+
+ public long tell() throws IOException {
+ return pos;
+ }
+
+ public void close() throws IOException {
+ stream.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java b/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java
new file mode 100644
index 0000000..1031d81
--- /dev/null
+++ b/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.java.io;
+
+import java.io.IOException;
+
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.file.FileReader;
+import org.apache.avro.file.SeekableInput;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.flink.api.avro.FSDataInputStreamWrapper;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.TypeInformation;
+import org.apache.flink.util.InstantiationUtil;
+
+
+public class AvroInputFormat<E> extends FileInputFormat<E> implements ResultTypeQueryable<E> {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final Log LOG = LogFactory.getLog(AvroInputFormat.class);
+
+
+ private final Class<E> avroValueType;
+
+ private boolean reuseAvroValue = true;
+
+
+ private transient FileReader<E> dataFileReader;
+
+
+ public AvroInputFormat(Path filePath, Class<E> type) {
+ super(filePath);
+ this.avroValueType = type;
+ this.unsplittable = true;
+ }
+
+
+ /**
+ * Sets the flag whether to reuse the Avro value instance for all records.
+ * By default, the input format reuses the Avro value.
+ *
+ * @param reuseAvroValue True, if the input format should reuse the Avro value instance, false otherwise.
+ */
+ public void setReuseAvroValue(boolean reuseAvroValue) {
+ this.reuseAvroValue = reuseAvroValue;
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Typing
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public TypeInformation<E> getProducedType() {
+ return TypeExtractor.getForClass(this.avroValueType);
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Input Format Methods
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public void open(FileInputSplit split) throws IOException {
+ super.open(split);
+
+ DatumReader<E> datumReader;
+ if (org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroValueType)) {
+ datumReader = new SpecificDatumReader<E>(avroValueType);
+ } else {
+ datumReader = new ReflectDatumReader<E>(avroValueType);
+ }
+
+ LOG.info("Opening split " + split);
+
+ SeekableInput in = new FSDataInputStreamWrapper(stream, (int) split.getLength());
+
+ dataFileReader = DataFileReader.openReader(in, datumReader);
+ dataFileReader.sync(split.getStart());
+ }
+
+ @Override
+ public boolean reachedEnd() throws IOException {
+ return !dataFileReader.hasNext();
+ }
+
+ @Override
+ public E nextRecord(E reuseValue) throws IOException {
+ if (!dataFileReader.hasNext()) {
+ return null;
+ }
+
+ if (!reuseAvroValue) {
+ reuseValue = InstantiationUtil.instantiate(avroValueType, Object.class);
+ }
+
+ reuseValue = dataFileReader.next(reuseValue);
+ return reuseValue;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java b/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java
new file mode 100644
index 0000000..56c8214
--- /dev/null
+++ b/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.java.io;
+
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.reflect.ReflectDatumWriter;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.flink.api.common.io.FileOutputFormat;
+import org.apache.flink.core.fs.Path;
+
+import java.io.IOException;
+
+public class AvroOutputFormat<E> extends FileOutputFormat<E> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final Class<E> avroValueType;
+
+ private Schema userDefinedSchema = null;
+
+ private transient DataFileWriter<E> dataFileWriter;
+
+
+ public AvroOutputFormat(Path filePath, Class<E> type) {
+ super(filePath);
+ this.avroValueType = type;
+ }
+
+ public AvroOutputFormat(Class<E> type) {
+ this.avroValueType = type;
+ }
+
+ public void setSchema(Schema schema) {
+ this.userDefinedSchema = schema;
+ }
+
+ @Override
+ public void writeRecord(E record) throws IOException {
+ dataFileWriter.append(record);
+ }
+
+ @Override
+ public void open(int taskNumber, int numTasks) throws IOException {
+ super.open(taskNumber, numTasks);
+ DatumWriter<E> datumWriter;
+ Schema schema = null;
+ if (org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroValueType)) {
+ datumWriter = new SpecificDatumWriter<E>(avroValueType);
+ try {
+ schema = ((org.apache.avro.specific.SpecificRecordBase)avroValueType.newInstance()).getSchema();
+ } catch (InstantiationException e) {
+ throw new RuntimeException(e.getMessage());
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException(e.getMessage());
+ }
+ } else {
+ datumWriter = new ReflectDatumWriter<E>(avroValueType);
+ schema = ReflectData.get().getSchema(avroValueType);
+ }
+ dataFileWriter = new DataFileWriter<E>(datumWriter);
+ if (userDefinedSchema == null) {
+ dataFileWriter.create(schema, stream);
+ } else {
+ dataFileWriter.create(userDefinedSchema, stream);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ dataFileWriter.flush();
+ dataFileWriter.close();
+ super.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/AvroInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/AvroInputFormat.java b/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/AvroInputFormat.java
new file mode 100644
index 0000000..ab96895
--- /dev/null
+++ b/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/AvroInputFormat.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.java.record.io.avro;
+
+import java.io.IOException;
+
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.file.FileReader;
+import org.apache.avro.file.SeekableInput;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.flink.api.avro.AvroBaseValue;
+import org.apache.flink.api.avro.FSDataInputStreamWrapper;
+import org.apache.flink.api.java.record.io.FileInputFormat;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.types.Record;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.ReflectionUtil;
+
+
+public class AvroInputFormat<E> extends FileInputFormat {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final Log LOG = LogFactory.getLog(AvroInputFormat.class);
+
+
+ private final Class<? extends AvroBaseValue<E>> avroWrapperTypeClass;
+
+ private final Class<E> avroValueType;
+
+
+ private transient FileReader<E> dataFileReader;
+
+ private transient E reuseAvroValue;
+
+ private transient AvroBaseValue<E> wrapper;
+
+
+ public AvroInputFormat(Class<? extends AvroBaseValue<E>> wrapperClass) {
+ this.avroWrapperTypeClass = wrapperClass;
+ this.avroValueType = ReflectionUtil.getTemplateType1(wrapperClass);
+ this.unsplittable = true;
+ }
+
+ public AvroInputFormat(Class<? extends AvroBaseValue<E>> wrapperClass, Class<E> avroType) {
+ this.avroValueType = avroType;
+ this.avroWrapperTypeClass = wrapperClass;
+ this.unsplittable = true;
+ }
+
+ @Override
+ public void open(FileInputSplit split) throws IOException {
+ super.open(split);
+
+ this.wrapper = InstantiationUtil.instantiate(avroWrapperTypeClass, AvroBaseValue.class);
+
+ DatumReader<E> datumReader;
+ if (org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroValueType)) {
+ datumReader = new SpecificDatumReader<E>(avroValueType);
+ } else {
+ datumReader = new ReflectDatumReader<E>(avroValueType);
+ }
+
+ LOG.info("Opening split " + split);
+
+ SeekableInput in = new FSDataInputStreamWrapper(stream, (int) split.getLength());
+
+ dataFileReader = DataFileReader.openReader(in, datumReader);
+ dataFileReader.sync(split.getStart());
+
+ reuseAvroValue = null;
+ }
+
+ @Override
+ public boolean reachedEnd() throws IOException {
+ return !dataFileReader.hasNext();
+ }
+
+ @Override
+ public Record nextRecord(Record record) throws IOException {
+ if (!dataFileReader.hasNext()) {
+ return null;
+ }
+
+ reuseAvroValue = dataFileReader.next(reuseAvroValue);
+ wrapper.datum(reuseAvroValue);
+ record.setField(0, wrapper);
+ return record;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/AvroRecordInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/AvroRecordInputFormat.java b/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/AvroRecordInputFormat.java
new file mode 100644
index 0000000..1464ca9
--- /dev/null
+++ b/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/AvroRecordInputFormat.java
@@ -0,0 +1,374 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.java.record.io.avro;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.Schema.Type;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.file.FileReader;
+import org.apache.avro.file.SeekableInput;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.flink.api.avro.FSDataInputStreamWrapper;
+import org.apache.flink.api.java.record.io.FileInputFormat;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.BooleanValue;
+import org.apache.flink.types.DoubleValue;
+import org.apache.flink.types.FloatValue;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.ListValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.MapValue;
+import org.apache.flink.types.NullValue;
+import org.apache.flink.types.Record;
+import org.apache.flink.types.StringValue;
+import org.apache.flink.types.Value;
+
+/**
+ * Input format to read Avro files.
+ *
+ * The input format currently supports only flat avro schemas. So there is no
+ * support for complex types except for nullable primitve fields, e.g.
+ * ["string", null] (See
+ * http://avro.apache.org/docs/current/spec.html#schema_complex)
+ *
+ */
+public class AvroRecordInputFormat extends FileInputFormat {
+ private static final long serialVersionUID = 1L;
+
+ private static final Log LOG = LogFactory.getLog(AvroRecordInputFormat.class);
+
+ private FileReader<GenericRecord> dataFileReader;
+ private GenericRecord reuseAvroRecord = null;
+
+ @Override
+ public void open(FileInputSplit split) throws IOException {
+ super.open(split);
+ DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>();
+ SeekableInput in = new FSDataInputStreamWrapper(stream, (int) split.getLength());
+ LOG.info("Opening split " + split);
+ dataFileReader = DataFileReader.openReader(in, datumReader);
+ dataFileReader.sync(split.getStart());
+ }
+
+ @Override
+ public boolean reachedEnd() throws IOException {
+ return !dataFileReader.hasNext();
+ }
+
+ @Override
+ public Record nextRecord(Record record) throws IOException {
+ if (!dataFileReader.hasNext()) {
+ return null;
+ }
+ if (record == null) {
+ throw new IllegalArgumentException("Empty PactRecord given");
+ }
+ reuseAvroRecord = dataFileReader.next(reuseAvroRecord);
+ final List<Field> fields = reuseAvroRecord.getSchema().getFields();
+ for (Field field : fields) {
+ final Value value = convertAvroToPactValue(field, reuseAvroRecord.get(field.pos()));
+ record.setField(field.pos(), value);
+ record.updateBinaryRepresenation();
+ }
+
+ return record;
+ }
+
+
+ @SuppressWarnings("unchecked")
+ private final Value convertAvroToPactValue(final Field field, final Object avroRecord) {
+ if (avroRecord == null) {
+ return null;
+ }
+ final Type type = checkTypeConstraintsAndGetType(field.schema());
+
+ // check for complex types
+ // (complex type FIXED is not yet supported)
+ switch (type) {
+ case ARRAY:
+ final Type elementType = field.schema().getElementType().getType();
+ final List<?> avroList = (List<?>) avroRecord;
+ return convertAvroArrayToListValue(elementType, avroList);
+ case ENUM:
+ final List<String> symbols = field.schema().getEnumSymbols();
+ final String avroRecordString = avroRecord.toString();
+ if (!symbols.contains(avroRecordString)) {
+ throw new RuntimeException("The given Avro file contains field with a invalid enum symbol");
+ }
+ sString.setValue(avroRecordString);
+ return sString;
+ case MAP:
+ final Type valueType = field.schema().getValueType().getType();
+ final Map<CharSequence, ?> avroMap = (Map<CharSequence, ?>) avroRecord;
+ return convertAvroMapToMapValue(valueType, avroMap);
+
+ // primitive type
+ default:
+ return convertAvroPrimitiveToValue(type, avroRecord);
+
+ }
+ }
+
+ private final ListValue<?> convertAvroArrayToListValue(Type elementType, List<?> avroList) {
+ switch (elementType) {
+ case STRING:
+ StringListValue sl = new StringListValue();
+ for (Object item : avroList) {
+ sl.add(new StringValue((CharSequence) item));
+ }
+ return sl;
+ case INT:
+ IntListValue il = new IntListValue();
+ for (Object item : avroList) {
+ il.add(new IntValue((Integer) item));
+ }
+ return il;
+ case BOOLEAN:
+ BooleanListValue bl = new BooleanListValue();
+ for (Object item : avroList) {
+ bl.add(new BooleanValue((Boolean) item));
+ }
+ return bl;
+ case DOUBLE:
+ DoubleListValue dl = new DoubleListValue();
+ for (Object item : avroList) {
+ dl.add(new DoubleValue((Double) item));
+ }
+ return dl;
+ case FLOAT:
+ FloatListValue fl = new FloatListValue();
+ for (Object item : avroList) {
+ fl.add(new FloatValue((Float) item));
+ }
+ return fl;
+ case LONG:
+ LongListValue ll = new LongListValue();
+ for (Object item : avroList) {
+ ll.add(new LongValue((Long) item));
+ }
+ return ll;
+ default:
+ throw new RuntimeException("Elements of type " + elementType + " are not supported for Avro arrays.");
+ }
+ }
+
+ private final MapValue<StringValue, ?> convertAvroMapToMapValue(Type mapValueType, Map<CharSequence, ?> avroMap) {
+ switch (mapValueType) {
+ case STRING:
+ StringMapValue sm = new StringMapValue();
+ for (Map.Entry<CharSequence, ?> entry : avroMap.entrySet()) {
+ sm.put(new StringValue((CharSequence) entry.getKey()), new StringValue((String) entry.getValue()));
+ }
+ return sm;
+ case INT:
+ IntMapValue im = new IntMapValue();
+ for (Map.Entry<CharSequence, ?> entry : avroMap.entrySet()) {
+ im.put(new StringValue((CharSequence) entry.getKey()), new IntValue((Integer) entry.getValue()));
+ }
+ return im;
+ case BOOLEAN:
+ BooleanMapValue bm = new BooleanMapValue();
+ for (Map.Entry<CharSequence, ?> entry : avroMap.entrySet()) {
+ bm.put(new StringValue((CharSequence) entry.getKey()), new BooleanValue((Boolean) entry.getValue()));
+ }
+ return bm;
+ case DOUBLE:
+ DoubleMapValue dm = new DoubleMapValue();
+ for (Map.Entry<CharSequence, ?> entry : avroMap.entrySet()) {
+ dm.put(new StringValue((CharSequence) entry.getKey()), new DoubleValue((Double) entry.getValue()));
+ }
+ return dm;
+ case FLOAT:
+ FloatMapValue fm = new FloatMapValue();
+ for (Map.Entry<CharSequence, ?> entry : avroMap.entrySet()) {
+ fm.put(new StringValue((CharSequence) entry.getKey()), new FloatValue((Float) entry.getValue()));
+ }
+ return fm;
+ case LONG:
+ LongMapValue lm = new LongMapValue();
+ for (Map.Entry<CharSequence, ?> entry : avroMap.entrySet()) {
+ lm.put(new StringValue((CharSequence) entry.getKey()), new LongValue((Long) entry.getValue()));
+ }
+ return lm;
+
+ default:
+ throw new RuntimeException("Map values of type " + mapValueType + " are not supported for Avro map.");
+ }
+ }
+
+ private StringValue sString = new StringValue();
+ private IntValue sInt = new IntValue();
+ private BooleanValue sBool = new BooleanValue();
+ private DoubleValue sDouble = new DoubleValue();
+ private FloatValue sFloat = new FloatValue();
+ private LongValue sLong = new LongValue();
+
+ private final Value convertAvroPrimitiveToValue(Type type, Object avroRecord) {
+ switch (type) {
+ case STRING:
+ sString.setValue((CharSequence) avroRecord);
+ return sString;
+ case INT:
+ sInt.setValue((Integer) avroRecord);
+ return sInt;
+ case BOOLEAN:
+ sBool.setValue((Boolean) avroRecord);
+ return sBool;
+ case DOUBLE:
+ sDouble.setValue((Double) avroRecord);
+ return sDouble;
+ case FLOAT:
+ sFloat.setValue((Float) avroRecord);
+ return sFloat;
+ case LONG:
+ sLong.setValue((Long) avroRecord);
+ return sLong;
+ case NULL:
+ return NullValue.getInstance();
+ default:
+ throw new RuntimeException(
+ "Type "
+ + type
+ + " for AvroInputFormat is not implemented. Open an issue on GitHub.");
+ }
+ }
+
+ private final Type checkTypeConstraintsAndGetType(final Schema schema) {
+ final Type type = schema.getType();
+ if (type == Type.RECORD) {
+ throw new RuntimeException("The given Avro file contains complex data types which are not supported right now");
+ }
+
+ if (type == Type.UNION) {
+ List<Schema> types = schema.getTypes();
+ if (types.size() > 2) {
+ throw new RuntimeException("The given Avro file contains a union that has more than two elements");
+ }
+ if (types.size() == 1 && types.get(0).getType() != Type.UNION) {
+ return types.get(0).getType();
+ }
+ if (types.get(0).getType() == Type.UNION || types.get(1).getType() == Type.UNION) {
+ throw new RuntimeException("The given Avro file contains a nested union");
+ }
+ if (types.get(0).getType() == Type.NULL) {
+ return types.get(1).getType();
+ } else {
+ if (types.get(1).getType() != Type.NULL) {
+ throw new RuntimeException("The given Avro file is contains a union with two non-null types.");
+ }
+ return types.get(0).getType();
+ }
+ }
+ return type;
+ }
+
+ /**
+ * Set minNumSplits to number of files.
+ */
+ @Override
+ public FileInputSplit[] createInputSplits(int minNumSplits) throws IOException {
+ int numAvroFiles = 0;
+ final Path path = this.filePath;
+ // get all the files that are involved in the splits
+ final FileSystem fs = path.getFileSystem();
+ final FileStatus pathFile = fs.getFileStatus(path);
+
+ if (!acceptFile(pathFile)) {
+ throw new IOException("The given file does not pass the file-filter");
+ }
+ if (pathFile.isDir()) {
+ // input is directory. list all contained files
+ final FileStatus[] dir = fs.listStatus(path);
+ for (int i = 0; i < dir.length; i++) {
+ if (!dir[i].isDir() && acceptFile(dir[i])) {
+ numAvroFiles++;
+ }
+ }
+ } else {
+ numAvroFiles = 1;
+ }
+ return super.createInputSplits(numAvroFiles);
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Concrete subclasses of ListValue and MapValue for all possible primitive types
+ // --------------------------------------------------------------------------------------------
+
+ public static class StringListValue extends ListValue<StringValue> {
+ private static final long serialVersionUID = 1L;
+ }
+
+ public static class IntListValue extends ListValue<IntValue> {
+ private static final long serialVersionUID = 1L;
+ }
+
+ public static class BooleanListValue extends ListValue<BooleanValue> {
+ private static final long serialVersionUID = 1L;
+ }
+
+ public static class DoubleListValue extends ListValue<DoubleValue> {
+ private static final long serialVersionUID = 1L;
+ }
+
+ public static class FloatListValue extends ListValue<FloatValue> {
+ private static final long serialVersionUID = 1L;
+ }
+
+ public static class LongListValue extends ListValue<LongValue> {
+ private static final long serialVersionUID = 1L;
+ }
+
+ public static class StringMapValue extends MapValue<StringValue, StringValue> {
+ private static final long serialVersionUID = 1L;
+ }
+
+ public static class IntMapValue extends MapValue<StringValue, IntValue> {
+ private static final long serialVersionUID = 1L;
+ }
+
+ public static class BooleanMapValue extends MapValue<StringValue, BooleanValue> {
+ private static final long serialVersionUID = 1L;
+ }
+
+ public static class DoubleMapValue extends MapValue<StringValue, DoubleValue> {
+ private static final long serialVersionUID = 1L;
+ }
+
+ public static class FloatMapValue extends MapValue<StringValue, FloatValue> {
+ private static final long serialVersionUID = 1L;
+ }
+
+ public static class LongMapValue extends MapValue<StringValue, LongValue> {
+ private static final long serialVersionUID = 1L;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/example/ReflectiveAvroTypeExample.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/example/ReflectiveAvroTypeExample.java b/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/example/ReflectiveAvroTypeExample.java
new file mode 100644
index 0000000..9cdaef0
--- /dev/null
+++ b/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/example/ReflectiveAvroTypeExample.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.record.io.avro.example;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.Random;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.io.OutputFormat;
+import org.apache.flink.api.java.record.functions.MapFunction;
+import org.apache.flink.api.java.record.functions.ReduceFunction;
+import org.apache.flink.api.java.record.io.GenericInputFormat;
+import org.apache.flink.api.java.record.operators.GenericDataSink;
+import org.apache.flink.api.java.record.operators.GenericDataSource;
+import org.apache.flink.api.java.record.operators.MapOperator;
+import org.apache.flink.api.java.record.operators.ReduceOperator;
+import org.apache.flink.client.LocalExecutor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.Record;
+import org.apache.flink.types.StringValue;
+import org.apache.flink.util.Collector;
+
+
+public class ReflectiveAvroTypeExample {
+
+
+ public static void main(String[] args) throws Exception {
+
+ GenericDataSource<UserGeneratingInputFormat> source = new GenericDataSource<UserGeneratingInputFormat>(UserGeneratingInputFormat.class);
+
+ MapOperator mapper = MapOperator.builder(new NumberExtractingMapper())
+ .input(source).name("le mapper").build();
+
+ ReduceOperator reducer = ReduceOperator.builder(new ConcatenatingReducer(), IntValue.class, 1)
+ .input(mapper).name("le reducer").build();
+
+ GenericDataSink sink = new GenericDataSink(PrintingOutputFormat.class, reducer);
+
+ Plan p = new Plan(sink);
+ p.setDefaultParallelism(4);
+
+ LocalExecutor.execute(p);
+ }
+
+
+ public static final class NumberExtractingMapper extends MapFunction implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void map(Record record, Collector<Record> out) throws Exception {
+ User u = record.getField(0, SUser.class).datum();
+ record.setField(1, new IntValue(u.getFavoriteNumber()));
+ out.collect(record);
+ }
+ }
+
+
+ public static final class ConcatenatingReducer extends ReduceFunction implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final Record result = new Record(2);
+
+ @Override
+ public void reduce(Iterator<Record> records, Collector<Record> out) throws Exception {
+ Record r = records.next();
+
+ int num = r.getField(1, IntValue.class).getValue();
+ String names = r.getField(0, SUser.class).datum().getFavoriteColor().toString();
+
+ while (records.hasNext()) {
+ r = records.next();
+ names += " - " + r.getField(0, SUser.class).datum().getFavoriteColor().toString();
+ }
+
+ result.setField(0, new IntValue(num));
+ result.setField(1, new StringValue(names));
+ out.collect(result);
+ }
+
+ }
+
+
+ public static final class UserGeneratingInputFormat extends GenericInputFormat {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final int NUM = 100;
+
+ private final Random rnd = new Random(32498562304986L);
+
+ private static final String[] NAMES = { "Peter", "Bob", "Liddy", "Alexander", "Stan" };
+
+ private static final String[] COLORS = { "mauve", "crimson", "copper", "sky", "grass" };
+
+ private int count;
+
+
+ @Override
+ public boolean reachedEnd() throws IOException {
+ return count >= NUM;
+ }
+
+ @Override
+ public Record nextRecord(Record record) throws IOException {
+ count++;
+
+ User u = new User();
+ u.setName(NAMES[rnd.nextInt(NAMES.length)]);
+ u.setFavoriteColor(COLORS[rnd.nextInt(COLORS.length)]);
+ u.setFavoriteNumber(rnd.nextInt(87));
+
+ SUser su = new SUser();
+ su.datum(u);
+
+ record.setField(0, su);
+ return record;
+ }
+ }
+
+ public static final class PrintingOutputFormat implements OutputFormat<Record> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void configure(Configuration parameters) {}
+
+ @Override
+ public void open(int taskNumber, int numTasks) throws IOException {}
+
+ @Override
+ public void writeRecord(Record record) throws IOException {
+ int color = record.getField(0, IntValue.class).getValue();
+ String names = record.getField(1, StringValue.class).getValue();
+
+ System.out.println(color + ": " + names);
+ }
+
+ @Override
+ public void close() throws IOException {}
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/example/SUser.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/example/SUser.java b/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/example/SUser.java
new file mode 100644
index 0000000..2fdfc05
--- /dev/null
+++ b/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/example/SUser.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.record.io.avro.example;
+
+import org.apache.flink.api.avro.AvroBaseValue;
+
+public class SUser extends AvroBaseValue<User> {
+ private static final long serialVersionUID = 1L;
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/example/User.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/example/User.java b/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/example/User.java
new file mode 100644
index 0000000..7f48775
--- /dev/null
+++ b/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/example/User.java
@@ -0,0 +1,269 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+/**
+ * Autogenerated by Avro
+ *
+ * DO NOT EDIT DIRECTLY
+ */
+package org.apache.flink.api.java.record.io.avro.example;
+@SuppressWarnings("all")
+@org.apache.avro.specific.AvroGenerated
+public class User extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
+ public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"org.apache.flink.api.avro.example\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"favorite_number\",\"type\":[\"int\",\"null\"]},{\"name\":\"favorite_color\",\"type\":[\"string\",\"null\"]}]}");
+ public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
+ @Deprecated public java.lang.CharSequence name;
+ @Deprecated public java.lang.Integer favorite_number;
+ @Deprecated public java.lang.CharSequence favorite_color;
+
+ /**
+ * Default constructor. Note that this does not initialize fields
+ * to their default values from the schema. If that is desired then
+ * one should use {@link \#newBuilder()}.
+ */
+ public User() {}
+
+ /**
+ * All-args constructor.
+ */
+ public User(java.lang.CharSequence name, java.lang.Integer favorite_number, java.lang.CharSequence favorite_color) {
+ this.name = name;
+ this.favorite_number = favorite_number;
+ this.favorite_color = favorite_color;
+ }
+
+ public org.apache.avro.Schema getSchema() { return SCHEMA$; }
+ // Used by DatumWriter. Applications should not call.
+ public java.lang.Object get(int field$) {
+ switch (field$) {
+ case 0: return name;
+ case 1: return favorite_number;
+ case 2: return favorite_color;
+ default: throw new org.apache.avro.AvroRuntimeException("Bad index");
+ }
+ }
+ // Used by DatumReader. Applications should not call.
+ @SuppressWarnings(value="unchecked")
+ public void put(int field$, java.lang.Object value$) {
+ switch (field$) {
+ case 0: name = (java.lang.CharSequence)value$; break;
+ case 1: favorite_number = (java.lang.Integer)value$; break;
+ case 2: favorite_color = (java.lang.CharSequence)value$; break;
+ default: throw new org.apache.avro.AvroRuntimeException("Bad index");
+ }
+ }
+
+ /**
+ * Gets the value of the 'name' field.
+ */
+ public java.lang.CharSequence getName() {
+ return name;
+ }
+
+ /**
+ * Sets the value of the 'name' field.
+ * @param value the value to set.
+ */
+ public void setName(java.lang.CharSequence value) {
+ this.name = value;
+ }
+
+ /**
+ * Gets the value of the 'favorite_number' field.
+ */
+ public java.lang.Integer getFavoriteNumber() {
+ return favorite_number;
+ }
+
+ /**
+ * Sets the value of the 'favorite_number' field.
+ * @param value the value to set.
+ */
+ public void setFavoriteNumber(java.lang.Integer value) {
+ this.favorite_number = value;
+ }
+
+ /**
+ * Gets the value of the 'favorite_color' field.
+ */
+ public java.lang.CharSequence getFavoriteColor() {
+ return favorite_color;
+ }
+
+ /**
+ * Sets the value of the 'favorite_color' field.
+ * @param value the value to set.
+ */
+ public void setFavoriteColor(java.lang.CharSequence value) {
+ this.favorite_color = value;
+ }
+
+ /** Creates a new User RecordBuilder */
+ public static org.apache.flink.api.java.record.io.avro.example.User.Builder newBuilder() {
+ return new org.apache.flink.api.java.record.io.avro.example.User.Builder();
+ }
+
+ /** Creates a new User RecordBuilder by copying an existing Builder */
+ public static org.apache.flink.api.java.record.io.avro.example.User.Builder newBuilder(org.apache.flink.api.java.record.io.avro.example.User.Builder other) {
+ return new org.apache.flink.api.java.record.io.avro.example.User.Builder(other);
+ }
+
+ /** Creates a new User RecordBuilder by copying an existing User instance */
+ public static org.apache.flink.api.java.record.io.avro.example.User.Builder newBuilder(org.apache.flink.api.java.record.io.avro.example.User other) {
+ return new org.apache.flink.api.java.record.io.avro.example.User.Builder(other);
+ }
+
+ /**
+ * RecordBuilder for User instances.
+ */
+ public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase<User>
+ implements org.apache.avro.data.RecordBuilder<User> {
+
+ private java.lang.CharSequence name;
+ private java.lang.Integer favorite_number;
+ private java.lang.CharSequence favorite_color;
+
+ /** Creates a new Builder */
+ private Builder() {
+ super(org.apache.flink.api.java.record.io.avro.example.User.SCHEMA$);
+ }
+
+ /** Creates a Builder by copying an existing Builder */
+ private Builder(org.apache.flink.api.java.record.io.avro.example.User.Builder other) {
+ super(other);
+ if (isValidValue(fields()[0], other.name)) {
+ this.name = data().deepCopy(fields()[0].schema(), other.name);
+ fieldSetFlags()[0] = true;
+ }
+ if (isValidValue(fields()[1], other.favorite_number)) {
+ this.favorite_number = data().deepCopy(fields()[1].schema(), other.favorite_number);
+ fieldSetFlags()[1] = true;
+ }
+ if (isValidValue(fields()[2], other.favorite_color)) {
+ this.favorite_color = data().deepCopy(fields()[2].schema(), other.favorite_color);
+ fieldSetFlags()[2] = true;
+ }
+ }
+
+ /** Creates a Builder by copying an existing User instance */
+ private Builder(org.apache.flink.api.java.record.io.avro.example.User other) {
+ super(org.apache.flink.api.java.record.io.avro.example.User.SCHEMA$);
+ if (isValidValue(fields()[0], other.name)) {
+ this.name = data().deepCopy(fields()[0].schema(), other.name);
+ fieldSetFlags()[0] = true;
+ }
+ if (isValidValue(fields()[1], other.favorite_number)) {
+ this.favorite_number = data().deepCopy(fields()[1].schema(), other.favorite_number);
+ fieldSetFlags()[1] = true;
+ }
+ if (isValidValue(fields()[2], other.favorite_color)) {
+ this.favorite_color = data().deepCopy(fields()[2].schema(), other.favorite_color);
+ fieldSetFlags()[2] = true;
+ }
+ }
+
+ /** Gets the value of the 'name' field */
+ public java.lang.CharSequence getName() {
+ return name;
+ }
+
+ /** Sets the value of the 'name' field */
+ public org.apache.flink.api.java.record.io.avro.example.User.Builder setName(java.lang.CharSequence value) {
+ validate(fields()[0], value);
+ this.name = value;
+ fieldSetFlags()[0] = true;
+ return this;
+ }
+
+ /** Checks whether the 'name' field has been set */
+ public boolean hasName() {
+ return fieldSetFlags()[0];
+ }
+
+ /** Clears the value of the 'name' field */
+ public org.apache.flink.api.java.record.io.avro.example.User.Builder clearName() {
+ name = null;
+ fieldSetFlags()[0] = false;
+ return this;
+ }
+
+ /** Gets the value of the 'favorite_number' field */
+ public java.lang.Integer getFavoriteNumber() {
+ return favorite_number;
+ }
+
+ /** Sets the value of the 'favorite_number' field */
+ public org.apache.flink.api.java.record.io.avro.example.User.Builder setFavoriteNumber(java.lang.Integer value) {
+ validate(fields()[1], value);
+ this.favorite_number = value;
+ fieldSetFlags()[1] = true;
+ return this;
+ }
+
+ /** Checks whether the 'favorite_number' field has been set */
+ public boolean hasFavoriteNumber() {
+ return fieldSetFlags()[1];
+ }
+
+ /** Clears the value of the 'favorite_number' field */
+ public org.apache.flink.api.java.record.io.avro.example.User.Builder clearFavoriteNumber() {
+ favorite_number = null;
+ fieldSetFlags()[1] = false;
+ return this;
+ }
+
+ /** Gets the value of the 'favorite_color' field */
+ public java.lang.CharSequence getFavoriteColor() {
+ return favorite_color;
+ }
+
+ /** Sets the value of the 'favorite_color' field */
+ public org.apache.flink.api.java.record.io.avro.example.User.Builder setFavoriteColor(java.lang.CharSequence value) {
+ validate(fields()[2], value);
+ this.favorite_color = value;
+ fieldSetFlags()[2] = true;
+ return this;
+ }
+
+ /** Checks whether the 'favorite_color' field has been set */
+ public boolean hasFavoriteColor() {
+ return fieldSetFlags()[2];
+ }
+
+ /** Clears the value of the 'favorite_color' field */
+ public org.apache.flink.api.java.record.io.avro.example.User.Builder clearFavoriteColor() {
+ favorite_color = null;
+ fieldSetFlags()[2] = false;
+ return this;
+ }
+
+ @Override
+ public User build() {
+ try {
+ User record = new User();
+ record.name = fieldSetFlags()[0] ? this.name : (java.lang.CharSequence) defaultValue(fields()[0]);
+ record.favorite_number = fieldSetFlags()[1] ? this.favorite_number : (java.lang.Integer) defaultValue(fields()[1]);
+ record.favorite_color = fieldSetFlags()[2] ? this.favorite_color : (java.lang.CharSequence) defaultValue(fields()[2]);
+ return record;
+ } catch (Exception e) {
+ throw new org.apache.avro.AvroRuntimeException(e);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-avro/src/test/assembly/test-assembly.xml
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/test/assembly/test-assembly.xml b/flink-addons/flink-avro/src/test/assembly/test-assembly.xml
new file mode 100644
index 0000000..8316581
--- /dev/null
+++ b/flink-addons/flink-avro/src/test/assembly/test-assembly.xml
@@ -0,0 +1,31 @@
+<!--
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License. See accompanying LICENSE file.
+-->
+
+<assembly>
+ <id>test-jar</id>
+ <formats>
+ <format>jar</format>
+ </formats>
+ <includeBaseDirectory>false</includeBaseDirectory>
+ <fileSets>
+ <fileSet>
+ <directory>${project.build.testOutputDirectory}</directory>
+ <outputDirectory>/</outputDirectory>
+ <!--modify/add include to match your package(s) -->
+ <includes>
+ <include>org/apache/flink/api/avro/testjar/**</include>
+ </includes>
+ </fileSet>
+ </fileSets>
+</assembly>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
new file mode 100644
index 0000000..4a6e7f1
--- /dev/null
+++ b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.avro;
+
+import java.io.File;
+import java.net.InetSocketAddress;
+
+import org.apache.flink.client.minicluster.NepheleMiniCluster;
+import org.apache.flink.client.program.Client;
+import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.LogUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class AvroExternalJarProgramITCase {
+
+ private static final int TEST_JM_PORT = 43191;
+
+ private static final String JAR_FILE = "target/maven-test-jar.jar";
+
+ private static final String TEST_DATA_FILE = "/testdata.avro";
+
+ static {
+ LogUtils.initializeDefaultTestConsoleLogger();
+ }
+
+ @Test
+ public void testExternalProgram() {
+
+ NepheleMiniCluster testMiniCluster = null;
+
+ try {
+ testMiniCluster = new NepheleMiniCluster();
+ testMiniCluster.setJobManagerRpcPort(TEST_JM_PORT);
+ testMiniCluster.setTaskManagerNumSlots(4);
+ testMiniCluster.start();
+
+ String jarFile = JAR_FILE;
+ String testData = getClass().getResource(TEST_DATA_FILE).toString();
+
+ PackagedProgram program = new PackagedProgram(new File(jarFile), new String[] { testData });
+
+ Client c = new Client(new InetSocketAddress("localhost", TEST_JM_PORT), new Configuration());
+ c.run(program, 4, true);
+ }
+ catch (Throwable t) {
+ System.err.println(t.getMessage());
+ t.printStackTrace();
+ Assert.fail("Error during the packaged program execution: " + t.getMessage());
+ }
+ finally {
+ if (testMiniCluster != null) {
+ try {
+ testMiniCluster.stop();
+ } catch (Throwable t) {}
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatTest.java b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatTest.java
new file mode 100644
index 0000000..637a5e9
--- /dev/null
+++ b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatTest.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.avro;
+
+import org.junit.Assert;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.MapFunction;
+import org.apache.flink.api.java.io.AvroOutputFormat;
+import org.apache.flink.api.java.record.io.avro.example.User;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+@SuppressWarnings("serial")
+public class AvroOutputFormatTest extends JavaProgramTestBase {
+
+ public static String outputPath1;
+
+ public static String outputPath2;
+
+ public static String inputPath;
+
+ public static String userData = "alice|1|blue\n" +
+ "bob|2|red\n" +
+ "john|3|yellow\n" +
+ "walt|4|black\n";
+
+ @Override
+ protected void preSubmit() throws Exception {
+ inputPath = createTempFile("user", userData);
+ outputPath1 = getTempDirPath("avro_output1");
+ outputPath2 = getTempDirPath("avro_output2");
+ }
+
+
+ @Override
+ protected void testProgram() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple3<String, Integer, String>> input = env.readCsvFile(inputPath)
+ .fieldDelimiter('|')
+ .types(String.class, Integer.class, String.class);
+
+ //output the data with AvroOutputFormat for specific user type
+ DataSet<User> specificUser = input.map(new ConvertToUser());
+ specificUser.write(new AvroOutputFormat<User>(User.class), outputPath1);
+
+ //output the data with AvroOutputFormat for reflect user type
+ DataSet<ReflectiveUser> reflectiveUser = specificUser.map(new ConvertToReflective());
+ reflectiveUser.write(new AvroOutputFormat<ReflectiveUser>(ReflectiveUser.class), outputPath2);
+
+ env.execute();
+ }
+
+ @Override
+ protected void postSubmit() throws Exception {
+ //compare result for specific user type
+ File [] output1;
+ File file1 = asFile(outputPath1);
+ if (file1.isDirectory()) {
+ output1 = file1.listFiles();
+ } else {
+ output1 = new File[] {file1};
+ }
+ List<String> result1 = new ArrayList<String>();
+ DatumReader<User> userDatumReader1 = new SpecificDatumReader<User>(User.class);
+ for (File avroOutput : output1) {
+ DataFileReader<User> dataFileReader1 = new DataFileReader<User>(avroOutput, userDatumReader1);
+ while (dataFileReader1.hasNext()) {
+ User user = dataFileReader1.next();
+ result1.add(user.getName() + "|" + user.getFavoriteNumber() + "|" + user.getFavoriteColor());
+ }
+ }
+ for (String expectedResult : userData.split("\n")) {
+ Assert.assertTrue("expected user " + expectedResult + " not found.", result1.contains(expectedResult));
+ }
+
+ //compare result for reflect user type
+ File [] output2;
+ File file2 = asFile(outputPath2);
+ if (file2.isDirectory()) {
+ output2 = file2.listFiles();
+ } else {
+ output2 = new File[] {file2};
+ }
+ List<String> result2 = new ArrayList<String>();
+ DatumReader<ReflectiveUser> userDatumReader2 = new ReflectDatumReader<ReflectiveUser>(ReflectiveUser.class);
+ for (File avroOutput : output2) {
+ DataFileReader<ReflectiveUser> dataFileReader2 = new DataFileReader<ReflectiveUser>(avroOutput, userDatumReader2);
+ while (dataFileReader2.hasNext()) {
+ ReflectiveUser user = dataFileReader2.next();
+ result2.add(user.getName() + "|" + user.getFavoriteNumber() + "|" + user.getFavoriteColor());
+ }
+ }
+ for (String expectedResult : userData.split("\n")) {
+ Assert.assertTrue("expected user " + expectedResult + " not found.", result2.contains(expectedResult));
+ }
+
+
+ }
+
+
+ public final static class ConvertToUser extends MapFunction<Tuple3<String, Integer, String>, User> {
+
+ @Override
+ public User map(Tuple3<String, Integer, String> value) throws Exception {
+ return new User(value.f0, value.f1, value.f2);
+ }
+ }
+
+ public final static class ConvertToReflective extends MapFunction<User, ReflectiveUser> {
+
+ @Override
+ public ReflectiveUser map(User value) throws Exception {
+ return new ReflectiveUser(value.getName().toString(), value.getFavoriteNumber(), value.getFavoriteColor().toString());
+ }
+ }
+
+
+ public static class ReflectiveUser {
+ private String name;
+ private int favoriteNumber;
+ private String favoriteColor;
+
+ public ReflectiveUser() {}
+
+ public ReflectiveUser(String name, int favoriteNumber, String favoriteColor) {
+ this.name = name;
+ this.favoriteNumber = favoriteNumber;
+ this.favoriteColor = favoriteColor;
+ }
+
+ public String getName() {
+ return this.name;
+ }
+ public String getFavoriteColor() {
+ return this.favoriteColor;
+ }
+ public int getFavoriteNumber() {
+ return this.favoriteNumber;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroWithEmptyArrayITCase.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroWithEmptyArrayITCase.java b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroWithEmptyArrayITCase.java
new file mode 100644
index 0000000..ea9edff
--- /dev/null
+++ b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroWithEmptyArrayITCase.java
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.avro;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.avro.reflect.Nullable;
+import org.apache.flink.api.avro.AvroBaseValue;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.io.OutputFormat;
+import org.apache.flink.api.java.record.functions.CoGroupFunction;
+import org.apache.flink.api.java.record.io.GenericInputFormat;
+import org.apache.flink.api.java.record.operators.CoGroupOperator;
+import org.apache.flink.api.java.record.operators.GenericDataSink;
+import org.apache.flink.api.java.record.operators.GenericDataSource;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.test.util.RecordAPITestBase;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.Record;
+import org.apache.flink.types.StringValue;
+import org.apache.flink.util.Collector;
+
+public class AvroWithEmptyArrayITCase extends RecordAPITestBase {
+
+ @Override
+ protected Plan getTestJob() {
+ GenericDataSource<RandomInputFormat> bookSource = new GenericDataSource<RandomInputFormat>(
+ new RandomInputFormat(true));
+ GenericDataSource<RandomInputFormat> authorSource = new GenericDataSource<RandomInputFormat>(
+ new RandomInputFormat(false));
+
+ CoGroupOperator coGroupOperator = CoGroupOperator.builder(MyCoGrouper.class, LongValue.class, 0, 0)
+ .input1(bookSource).input2(authorSource).name("CoGrouper Test").build();
+
+ GenericDataSink sink = new GenericDataSink(PrintingOutputFormat.class, coGroupOperator);
+
+ Plan plan = new Plan(sink, "CoGroper Test Plan");
+ plan.setDefaultParallelism(1);
+ return plan;
+ }
+
+ public static class SBookAvroValue extends AvroBaseValue<Book> {
+ private static final long serialVersionUID = 1L;
+
+ public SBookAvroValue() {}
+
+ public SBookAvroValue(Book datum) {
+ super(datum);
+ }
+ }
+
+ public static class Book {
+
+ long bookId;
+ @Nullable
+ String title;
+ long authorId;
+
+ public Book() {
+ }
+
+ public Book(long bookId, String title, long authorId) {
+ this.bookId = bookId;
+ this.title = title;
+ this.authorId = authorId;
+ }
+ }
+
+ public static class SBookAuthorValue extends AvroBaseValue<BookAuthor> {
+ private static final long serialVersionUID = 1L;
+
+ public SBookAuthorValue() {}
+
+ public SBookAuthorValue(BookAuthor datum) {
+ super(datum);
+ }
+ }
+
+ public static class BookAuthor {
+
+ enum BookType {
+ book,
+ article,
+ journal
+ }
+
+ long authorId;
+
+ @Nullable
+ List<String> bookTitles;
+
+ @Nullable
+ List<Book> books;
+
+ String authorName;
+
+ BookType bookType;
+
+ public BookAuthor() {}
+
+ public BookAuthor(long authorId, List<String> bookTitles, String authorName) {
+ this.authorId = authorId;
+ this.bookTitles = bookTitles;
+ this.authorName = authorName;
+ }
+ }
+
+ public static class RandomInputFormat extends GenericInputFormat {
+ private static final long serialVersionUID = 1L;
+
+ private final boolean isBook;
+
+ private boolean touched = false;
+
+ public RandomInputFormat(boolean isBook) {
+ this.isBook = isBook;
+ }
+
+ @Override
+ public boolean reachedEnd() throws IOException {
+ return touched;
+ }
+
+ @Override
+ public Record nextRecord(Record record) throws IOException {
+ touched = true;
+ record.setField(0, new LongValue(26382648));
+
+ if (isBook) {
+ Book b = new Book(123, "This is a test book", 26382648);
+ record.setField(1, new SBookAvroValue(b));
+ } else {
+ List<String> titles = new ArrayList<String>();
+ // titles.add("Title1");
+ // titles.add("Title2");
+ // titles.add("Title3");
+
+ List<Book> books = new ArrayList<Book>();
+ books.add(new Book(123, "This is a test book", 1));
+ books.add(new Book(24234234, "This is a test book", 1));
+ books.add(new Book(1234324, "This is a test book", 3));
+
+ BookAuthor a = new BookAuthor(1, titles, "Test Author");
+ a.books = books;
+ a.bookType = BookAuthor.BookType.journal;
+ record.setField(1, new SBookAuthorValue(a));
+ }
+
+ return record;
+ }
+ }
+
+ public static final class PrintingOutputFormat implements OutputFormat<Record> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void configure(Configuration parameters) {}
+
+ @Override
+ public void open(int taskNumber, int numTasks) {}
+
+ @Override
+ public void writeRecord(Record record) throws IOException {
+ long key = record.getField(0, LongValue.class).getValue();
+ String val = record.getField(1, StringValue.class).getValue();
+ System.out.println(key + " : " + val);
+ }
+
+ @Override
+ public void close() {}
+ }
+
+ public static class MyCoGrouper extends CoGroupFunction {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void coGroup(Iterator<Record> records1, Iterator<Record> records2, Collector<Record> out)
+ throws Exception {
+
+ Record r1 = null;
+ if (records1.hasNext()) {
+ r1 = records1.next();
+ }
+ Record r2 = null;
+ if (records2.hasNext()) {
+ r2 = records2.next();
+ }
+
+ if (r1 != null) {
+ r1.getField(1, SBookAvroValue.class).datum();
+ }
+
+ if (r2 != null) {
+ r2.getField(1, SBookAuthorValue.class).datum();
+ }
+ }
+ }
+}