You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/11/03 17:11:34 UTC
[11/21] flink git commit: [FLINK-7420] [avro] Move all Avro code to
flink-avro
[FLINK-7420] [avro] Move all Avro code to flink-avro
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/537a10ea
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/537a10ea
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/537a10ea
Branch: refs/heads/master
Commit: 537a10ea2ff6a2d8507483c66f413f77884e77c4
Parents: 2c0fa24
Author: twalthr <tw...@apache.org>
Authored: Wed Aug 16 12:17:00 2017 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Nov 3 16:40:34 2017 +0100
----------------------------------------------------------------------
.gitignore | 2 +-
flink-connectors/flink-avro/pom.xml | 265 ---------
.../apache/flink/api/avro/DataInputDecoder.java | 212 --------
.../flink/api/avro/DataOutputEncoder.java | 180 -------
.../api/avro/FSDataInputStreamWrapper.java | 67 ---
.../flink/api/java/io/AvroInputFormat.java | 207 --------
.../flink/api/java/io/AvroOutputFormat.java | 201 -------
.../src/test/assembly/test-assembly.xml | 36 --
.../api/avro/AvroExternalJarProgramITCase.java | 92 ----
.../flink/api/avro/AvroOutputFormatITCase.java | 177 -------
.../flink/api/avro/EncoderDecoderTest.java | 529 ------------------
.../avro/testjar/AvroExternalJarProgram.java | 211 --------
.../apache/flink/api/io/avro/AvroPojoTest.java | 255 ---------
.../api/io/avro/AvroRecordInputFormatTest.java | 460 ----------------
.../io/avro/AvroSplittableInputFormatTest.java | 325 ------------
.../api/io/avro/example/AvroTypeExample.java | 106 ----
.../apache/flink/api/io/avro/example/User.java | 269 ----------
.../io/AvroInputFormatTypeExtractionTest.java | 86 ---
.../flink/api/java/io/AvroOutputFormatTest.java | 197 -------
.../api/java/typeutils/AvroTypeInfoTest.java | 37 --
.../src/test/resources/avro/user.avsc | 35 --
.../src/test/resources/log4j-test.properties | 27 -
.../flink-avro/src/test/resources/testdata.avro | Bin 4572 -> 0 bytes
.../flink-connector-filesystem/pom.xml | 9 +
.../connectors/fs/AvroKeyValueSinkWriter.java | 2 +-
.../flink-connector-kafka-0.10/pom.xml | 12 +-
.../flink-connector-kafka-0.11/pom.xml | 12 +-
.../flink-connector-kafka-0.8/pom.xml | 12 +-
.../flink-connector-kafka-0.9/pom.xml | 12 +-
.../flink-connector-kafka-base/pom.xml | 31 +-
.../AvroRowDeserializationSchema.java | 179 -------
.../AvroRowSerializationSchema.java | 149 ------
.../kafka/AvroRowDeSerializationSchemaTest.java | 148 ------
.../kafka/KafkaAvroTableSourceTestBase.java | 2 +-
.../kafka/testutils/AvroTestUtils.java | 152 ------
flink-connectors/pom.xml | 1 -
flink-core/pom.xml | 9 +-
.../flink/api/common/ExecutionConfig.java | 10 +-
.../flink/api/java/typeutils/AvroTypeInfo.java | 84 ---
.../flink/api/java/typeutils/PojoTypeInfo.java | 26 +-
.../api/java/typeutils/TypeExtractionUtils.java | 35 ++
.../flink/api/java/typeutils/TypeExtractor.java | 64 ++-
.../java/typeutils/runtime/AvroSerializer.java | 332 ------------
.../typeutils/runtime/DataInputDecoder.java | 230 --------
.../typeutils/runtime/DataOutputEncoder.java | 191 -------
.../typeutils/runtime/kryo/KryoSerializer.java | 11 +-
.../typeutils/runtime/kryo/Serializers.java | 134 +++--
.../runtime/AvroGenericArraySerializerTest.java | 28 -
.../runtime/AvroGenericTypeComparatorTest.java | 28 -
.../runtime/AvroGenericTypeSerializerTest.java | 29 -
.../runtime/AvroSerializerEmptyArrayTest.java | 189 -------
.../kryo/KryoSerializerCompatibilityTest.java | 28 +-
.../resources/kryo-serializer-flink1.3-snapshot | Bin 0 -> 1305 bytes
.../scala/examples/twitter/TwitterExample.scala | 3 +-
flink-formats/flink-avro/pom.xml | 280 ++++++++++
.../flink/api/java/typeutils/AvroTypeInfo.java | 38 ++
.../flink/formats/avro/AvroInputFormat.java | 207 ++++++++
.../flink/formats/avro/AvroOutputFormat.java | 201 +++++++
.../avro/AvroRowDeserializationSchema.java | 179 +++++++
.../avro/AvroRowSerializationSchema.java | 149 ++++++
.../formats/avro/typeutils/AvroSerializer.java | 338 ++++++++++++
.../formats/avro/typeutils/AvroTypeInfo.java | 100 ++++
.../avro/utils/AvroKryoSerializerUtils.java | 72 +++
.../formats/avro/utils/DataInputDecoder.java | 212 ++++++++
.../formats/avro/utils/DataOutputEncoder.java | 180 +++++++
.../avro/utils/FSDataInputStreamWrapper.java | 67 +++
.../src/test/assembly/test-assembly.xml | 36 ++
.../avro/AvroExternalJarProgramITCase.java | 92 ++++
.../avro/AvroInputFormatTypeExtractionTest.java | 86 +++
.../formats/avro/AvroOutputFormatITCase.java | 188 +++++++
.../formats/avro/AvroOutputFormatTest.java | 207 ++++++++
.../formats/avro/AvroRecordInputFormatTest.java | 459 ++++++++++++++++
.../avro/AvroRowDeSerializationSchemaTest.java | 146 +++++
.../avro/AvroSplittableInputFormatTest.java | 324 +++++++++++
.../flink/formats/avro/EncoderDecoderTest.java | 531 +++++++++++++++++++
.../avro/testjar/AvroExternalJarProgram.java | 211 ++++++++
.../AvroGenericArraySerializerTest.java | 33 ++
.../AvroGenericTypeComparatorTest.java | 33 ++
.../AvroGenericTypeSerializerTest.java | 33 ++
.../typeutils/AvroSerializerEmptyArrayTest.java | 217 ++++++++
.../avro/typeutils/AvroTypeExtractionTest.java | 257 +++++++++
.../avro/typeutils/AvroTypeInfoTest.java | 37 ++
.../flink/formats/avro/utils/AvroTestUtils.java | 152 ++++++
.../src/test/resources/avro/user.avsc | 35 ++
.../src/test/resources/log4j-test.properties | 27 +
.../flink-avro/src/test/resources/testdata.avro | Bin 0 -> 4576 bytes
flink-formats/pom.xml | 42 ++
.../org/apache/flink/hdfstests/HDFSTest.java | 35 +-
flink-libraries/flink-cep/pom.xml | 8 +-
.../flink-shaded-hadoop2/pom.xml | 12 +
flink-shaded-hadoop/pom.xml | 5 -
flink-tests/pom.xml | 15 -
pom.xml | 21 +-
tools/maven/suppressions.xml | 3 +-
tools/travis_mvn_watchdog.sh | 2 +-
95 files changed, 5488 insertions(+), 5910 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index 3c9e4e8..8fc9fce 100644
--- a/.gitignore
+++ b/.gitignore
@@ -17,7 +17,7 @@ tmp
*.log
.DS_Store
build-target
-flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/
+flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/generated/
flink-runtime-web/web-dashboard/assets/fonts/
flink-runtime-web/web-dashboard/node_modules/
flink-runtime-web/web-dashboard/bower_components/
http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-connectors/flink-avro/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-avro/pom.xml b/flink-connectors/flink-avro/pom.xml
deleted file mode 100644
index f8d9293..0000000
--- a/flink-connectors/flink-avro/pom.xml
+++ /dev/null
@@ -1,265 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connectors</artifactId>
- <version>1.4-SNAPSHOT</version>
- <relativePath>..</relativePath>
- </parent>
-
- <artifactId>flink-avro_${scala.binary.version}</artifactId>
- <name>flink-avro</name>
-
- <packaging>jar</packaging>
-
- <dependencies>
-
- <!-- core dependencies -->
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-core</artifactId>
- <version>${project.version}</version>
- <scope>provided</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.avro</groupId>
- <artifactId>avro</artifactId>
- <!-- version is derived from base module -->
- </dependency>
-
- <!-- test dependencies -->
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-test-utils-junit</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-test-utils_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-clients_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-core</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- <type>test-jar</type>
- </dependency>
- </dependencies>
-
- <build>
- <plugins>
- <plugin>
- <artifactId>maven-assembly-plugin</artifactId>
- <executions>
- <execution>
- <id>create-test-dependency</id>
- <phase>process-test-classes</phase>
- <goals>
- <goal>single</goal>
- </goals>
- <configuration>
- <archive>
- <manifest>
- <mainClass>org.apache.flink.api.avro.testjar.AvroExternalJarProgram</mainClass>
- </manifest>
- </archive>
- <finalName>maven</finalName>
- <attach>false</attach>
- <descriptors>
- <descriptor>src/test/assembly/test-assembly.xml</descriptor>
- </descriptors>
- </configuration>
- </execution>
- </executions>
- </plugin>
- <!--Remove the AvroExternalJarProgram code from the test-classes directory since it musn't be in the
- classpath when running the tests to actually test whether the user code class loader
- is properly used.-->
- <plugin>
- <artifactId>maven-clean-plugin</artifactId>
- <version>2.5</version><!--$NO-MVN-MAN-VER$-->
- <executions>
- <execution>
- <id>remove-avroexternalprogram</id>
- <phase>process-test-classes</phase>
- <goals>
- <goal>clean</goal>
- </goals>
- <configuration>
- <excludeDefaultDirectories>true</excludeDefaultDirectories>
- <filesets>
- <fileset>
- <directory>${project.build.testOutputDirectory}</directory>
- <includes>
- <include>**/testjar/*.class</include>
- </includes>
- </fileset>
- </filesets>
- </configuration>
- </execution>
- </executions>
- <configuration>
- <filesets>
- <fileset>
- <directory>${project.basedir}/src/test/java/org/apache/flink/api/io/avro/generated</directory>
- </fileset>
- </filesets>
- </configuration>
- </plugin>
- <!-- Generate Test class from avro schema -->
- <plugin>
- <groupId>org.apache.avro</groupId>
- <artifactId>avro-maven-plugin</artifactId>
- <version>1.8.2</version>
- <executions>
- <execution>
- <phase>generate-sources</phase>
- <goals>
- <goal>schema</goal>
- </goals>
- <configuration>
- <testSourceDirectory>${project.basedir}/src/test/resources/avro</testSourceDirectory>
- <testOutputDirectory>${project.basedir}/src/test/java/</testOutputDirectory>
- </configuration>
- </execution>
- </executions>
- </plugin>
-
- <!-- Add Avro test classes to test jar in order to test AvroTypeInfo. -->
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-jar-plugin</artifactId>
- <executions>
- <execution>
- <goals>
- <goal>test-jar</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
-
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-shade-plugin</artifactId>
- <executions>
- <execution>
- <id>shade-flink</id>
- <phase>package</phase>
- <goals>
- <goal>shade</goal>
- </goals>
- <configuration combine.self="override">
- <dependencyReducedPomLocation>${project.basedir}/target/dependency-reduced-pom.xml</dependencyReducedPomLocation>
- <promoteTransitiveDependencies>true</promoteTransitiveDependencies>
- <artifactSet>
- <includes>
- <include>org.codehaus.jackson:*</include>
- </includes>
- </artifactSet>
- <relocations>
- <relocation>
- <pattern>org.codehaus.jackson</pattern>
- <shadedPattern>org.apache.flink.avro.shaded.org.codehouse.jackson</shadedPattern>
- </relocation>
- </relocations>
- </configuration>
- </execution>
- </executions>
- </plugin>
- </plugins>
-
- <pluginManagement>
- <plugins>
- <!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.-->
- <plugin>
- <groupId>org.eclipse.m2e</groupId>
- <artifactId>lifecycle-mapping</artifactId>
- <version>1.0.0</version>
- <configuration>
- <lifecycleMappingMetadata>
- <pluginExecutions>
- <pluginExecution>
- <pluginExecutionFilter>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-assembly-plugin</artifactId>
- <versionRange>[2.4,)</versionRange>
- <goals>
- <goal>single</goal>
- </goals>
- </pluginExecutionFilter>
- <action>
- <ignore/>
- </action>
- </pluginExecution>
- <pluginExecution>
- <pluginExecutionFilter>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-clean-plugin</artifactId>
- <versionRange>[1,)</versionRange>
- <goals>
- <goal>clean</goal>
- </goals>
- </pluginExecutionFilter>
- <action>
- <ignore/>
- </action>
- </pluginExecution>
- <pluginExecution>
- <pluginExecutionFilter>
- <groupId>org.apache.avro</groupId>
- <artifactId>avro-maven-plugin</artifactId>
- <versionRange>[1.7.7,)</versionRange>
- <goals>
- <goal>schema</goal>
- </goals>
- </pluginExecutionFilter>
- <action>
- <ignore/>
- </action>
- </pluginExecution>
- </pluginExecutions>
- </lifecycleMappingMetadata>
- </configuration>
- </plugin>
- </plugins>
- </pluginManagement>
- </build>
-
-</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/DataInputDecoder.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/DataInputDecoder.java b/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/DataInputDecoder.java
deleted file mode 100644
index 870d66f..0000000
--- a/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/DataInputDecoder.java
+++ /dev/null
@@ -1,212 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.avro;
-
-import org.apache.avro.io.Decoder;
-import org.apache.avro.util.Utf8;
-
-import java.io.DataInput;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-/**
- * A {@link Decoder} that reads from a {@link DataInput}.
- */
-public class DataInputDecoder extends Decoder {
-
- private final Utf8 stringDecoder = new Utf8();
-
- private DataInput in;
-
- public void setIn(DataInput in) {
- this.in = in;
- }
-
- // --------------------------------------------------------------------------------------------
- // primitives
- // --------------------------------------------------------------------------------------------
-
- @Override
- public void readNull() {}
-
- @Override
- public boolean readBoolean() throws IOException {
- return in.readBoolean();
- }
-
- @Override
- public int readInt() throws IOException {
- return in.readInt();
- }
-
- @Override
- public long readLong() throws IOException {
- return in.readLong();
- }
-
- @Override
- public float readFloat() throws IOException {
- return in.readFloat();
- }
-
- @Override
- public double readDouble() throws IOException {
- return in.readDouble();
- }
-
- @Override
- public int readEnum() throws IOException {
- return readInt();
- }
-
- // --------------------------------------------------------------------------------------------
- // bytes
- // --------------------------------------------------------------------------------------------
-
- @Override
- public void readFixed(byte[] bytes, int start, int length) throws IOException {
- in.readFully(bytes, start, length);
- }
-
- @Override
- public ByteBuffer readBytes(ByteBuffer old) throws IOException {
- int length = readInt();
- ByteBuffer result;
- if (old != null && length <= old.capacity() && old.hasArray()) {
- result = old;
- result.clear();
- } else {
- result = ByteBuffer.allocate(length);
- }
- in.readFully(result.array(), result.arrayOffset() + result.position(), length);
- result.limit(length);
- return result;
- }
-
- @Override
- public void skipFixed(int length) throws IOException {
- skipBytes(length);
- }
-
- @Override
- public void skipBytes() throws IOException {
- int num = readInt();
- skipBytes(num);
- }
-
- // --------------------------------------------------------------------------------------------
- // strings
- // --------------------------------------------------------------------------------------------
-
- @Override
- public Utf8 readString(Utf8 old) throws IOException {
- int length = readInt();
- Utf8 result = (old != null ? old : new Utf8());
- result.setByteLength(length);
-
- if (length > 0) {
- in.readFully(result.getBytes(), 0, length);
- }
-
- return result;
- }
-
- @Override
- public String readString() throws IOException {
- return readString(stringDecoder).toString();
- }
-
- @Override
- public void skipString() throws IOException {
- int len = readInt();
- skipBytes(len);
- }
-
- // --------------------------------------------------------------------------------------------
- // collection types
- // --------------------------------------------------------------------------------------------
-
- @Override
- public long readArrayStart() throws IOException {
- return readVarLongCount(in);
- }
-
- @Override
- public long arrayNext() throws IOException {
- return readVarLongCount(in);
- }
-
- @Override
- public long skipArray() throws IOException {
- return readVarLongCount(in);
- }
-
- @Override
- public long readMapStart() throws IOException {
- return readVarLongCount(in);
- }
-
- @Override
- public long mapNext() throws IOException {
- return readVarLongCount(in);
- }
-
- @Override
- public long skipMap() throws IOException {
- return readVarLongCount(in);
- }
-
- // --------------------------------------------------------------------------------------------
- // union
- // --------------------------------------------------------------------------------------------
-
- @Override
- public int readIndex() throws IOException {
- return readInt();
- }
-
- // --------------------------------------------------------------------------------------------
- // utils
- // --------------------------------------------------------------------------------------------
-
- private void skipBytes(int num) throws IOException {
- while (num > 0) {
- num -= in.skipBytes(num);
- }
- }
-
- public static long readVarLongCount(DataInput in) throws IOException {
- long value = in.readUnsignedByte();
-
- if ((value & 0x80) == 0) {
- return value;
- }
- else {
- long curr;
- int shift = 7;
- value = value & 0x7f;
- while (((curr = in.readUnsignedByte()) & 0x80) != 0){
- value |= (curr & 0x7f) << shift;
- shift += 7;
- }
- value |= curr << shift;
- return value;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/DataOutputEncoder.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/DataOutputEncoder.java b/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/DataOutputEncoder.java
deleted file mode 100644
index beae330..0000000
--- a/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/DataOutputEncoder.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.avro;
-
-import org.apache.avro.io.Encoder;
-import org.apache.avro.util.Utf8;
-
-import java.io.DataOutput;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-/**
- * An {@link Encoder} that writes data to a {@link DataOutput}.
- */
-public final class DataOutputEncoder extends Encoder implements java.io.Serializable {
-
- private static final long serialVersionUID = 1L;
-
- private DataOutput out;
-
- public void setOut(DataOutput out) {
- this.out = out;
- }
-
- @Override
- public void flush() throws IOException {}
-
- // --------------------------------------------------------------------------------------------
- // primitives
- // --------------------------------------------------------------------------------------------
-
- @Override
- public void writeNull() {}
-
- @Override
- public void writeBoolean(boolean b) throws IOException {
- out.writeBoolean(b);
- }
-
- @Override
- public void writeInt(int n) throws IOException {
- out.writeInt(n);
- }
-
- @Override
- public void writeLong(long n) throws IOException {
- out.writeLong(n);
- }
-
- @Override
- public void writeFloat(float f) throws IOException {
- out.writeFloat(f);
- }
-
- @Override
- public void writeDouble(double d) throws IOException {
- out.writeDouble(d);
- }
-
- @Override
- public void writeEnum(int e) throws IOException {
- out.writeInt(e);
- }
-
- // --------------------------------------------------------------------------------------------
- // bytes
- // --------------------------------------------------------------------------------------------
-
- @Override
- public void writeFixed(byte[] bytes, int start, int len) throws IOException {
- out.write(bytes, start, len);
- }
-
- @Override
- public void writeBytes(byte[] bytes, int start, int len) throws IOException {
- out.writeInt(len);
- if (len > 0) {
- out.write(bytes, start, len);
- }
- }
-
- @Override
- public void writeBytes(ByteBuffer bytes) throws IOException {
- int num = bytes.remaining();
- out.writeInt(num);
-
- if (num > 0) {
- writeFixed(bytes);
- }
- }
-
- // --------------------------------------------------------------------------------------------
- // strings
- // --------------------------------------------------------------------------------------------
-
- @Override
- public void writeString(String str) throws IOException {
- byte[] bytes = Utf8.getBytesFor(str);
- writeBytes(bytes, 0, bytes.length);
- }
-
- @Override
- public void writeString(Utf8 utf8) throws IOException {
- writeBytes(utf8.getBytes(), 0, utf8.getByteLength());
-
- }
-
- // --------------------------------------------------------------------------------------------
- // collection types
- // --------------------------------------------------------------------------------------------
-
- @Override
- public void writeArrayStart() {}
-
- @Override
- public void setItemCount(long itemCount) throws IOException {
- if (itemCount > 0) {
- writeVarLongCount(out, itemCount);
- }
- }
-
- @Override
- public void startItem() {}
-
- @Override
- public void writeArrayEnd() throws IOException {
- // write a single byte 0, shortcut for a var-length long of 0
- out.write(0);
- }
-
- @Override
- public void writeMapStart() {}
-
- @Override
- public void writeMapEnd() throws IOException {
- // write a single byte 0, shortcut for a var-length long of 0
- out.write(0);
- }
-
- // --------------------------------------------------------------------------------------------
- // union
- // --------------------------------------------------------------------------------------------
-
- @Override
- public void writeIndex(int unionIndex) throws IOException {
- out.writeInt(unionIndex);
- }
-
- // --------------------------------------------------------------------------------------------
- // utils
- // --------------------------------------------------------------------------------------------
-
- public static void writeVarLongCount(DataOutput out, long val) throws IOException {
- if (val < 0) {
- throw new IOException("Illegal count (must be non-negative): " + val);
- }
-
- while ((val & ~0x7FL) != 0) {
- out.write(((int) val) | 0x80);
- val >>>= 7;
- }
- out.write((int) val);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/FSDataInputStreamWrapper.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/FSDataInputStreamWrapper.java b/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/FSDataInputStreamWrapper.java
deleted file mode 100644
index 19e4a89..0000000
--- a/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/FSDataInputStreamWrapper.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.avro;
-
-import org.apache.flink.core.fs.FSDataInputStream;
-
-import org.apache.avro.file.SeekableInput;
-
-import java.io.Closeable;
-import java.io.IOException;
-
-/**
- * Code copy pasted from org.apache.avro.mapred.FSInput (which is Apache licensed as well).
- *
- * <p>The wrapper keeps track of the position in the data stream.
- */
-public class FSDataInputStreamWrapper implements Closeable, SeekableInput {
- private final FSDataInputStream stream;
- private long pos;
- private long len;
-
- public FSDataInputStreamWrapper(FSDataInputStream stream, long len) {
- this.stream = stream;
- this.pos = 0;
- this.len = len;
- }
-
- public long length() throws IOException {
- return this.len;
- }
-
- public int read(byte[] b, int off, int len) throws IOException {
- int read;
- read = stream.read(b, off, len);
- pos += read;
- return read;
- }
-
- public void seek(long p) throws IOException {
- stream.seek(p);
- pos = p;
- }
-
- public long tell() throws IOException {
- return pos;
- }
-
- public void close() throws IOException {
- stream.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java b/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java
deleted file mode 100644
index 33105cc..0000000
--- a/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java
+++ /dev/null
@@ -1,207 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.io;
-
-import org.apache.flink.api.avro.FSDataInputStreamWrapper;
-import org.apache.flink.api.common.io.CheckpointableInputFormat;
-import org.apache.flink.api.common.io.FileInputFormat;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.core.fs.FileInputSplit;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.util.InstantiationUtil;
-import org.apache.flink.util.Preconditions;
-
-import org.apache.avro.file.DataFileReader;
-import org.apache.avro.file.SeekableInput;
-import org.apache.avro.generic.GenericDatumReader;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.io.DatumReader;
-import org.apache.avro.reflect.ReflectDatumReader;
-import org.apache.avro.specific.SpecificDatumReader;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-
-/**
- * Provides a {@link FileInputFormat} for Avro records.
- *
- * @param <E>
- * the type of the result Avro record. If you specify
- * {@link GenericRecord} then the result will be returned as a
- * {@link GenericRecord}, so you do not have to know the schema ahead
- * of time.
- */
-public class AvroInputFormat<E> extends FileInputFormat<E> implements ResultTypeQueryable<E>,
- CheckpointableInputFormat<FileInputSplit, Tuple2<Long, Long>> {
-
- private static final long serialVersionUID = 1L;
-
- private static final Logger LOG = LoggerFactory.getLogger(AvroInputFormat.class);
-
- private final Class<E> avroValueType;
-
- private boolean reuseAvroValue = true;
-
- private transient DataFileReader<E> dataFileReader;
-
- private transient long end;
-
- private transient long recordsReadSinceLastSync;
-
- private long lastSync = -1L;
-
- public AvroInputFormat(Path filePath, Class<E> type) {
- super(filePath);
- this.avroValueType = type;
- }
-
- /**
- * Sets the flag whether to reuse the Avro value instance for all records.
- * By default, the input format reuses the Avro value.
- *
- * @param reuseAvroValue True, if the input format should reuse the Avro value instance, false otherwise.
- */
- public void setReuseAvroValue(boolean reuseAvroValue) {
- this.reuseAvroValue = reuseAvroValue;
- }
-
- /**
- * If set, the InputFormat will only read entire files.
- */
- public void setUnsplittable(boolean unsplittable) {
- this.unsplittable = unsplittable;
- }
-
- // --------------------------------------------------------------------------------------------
- // Typing
- // --------------------------------------------------------------------------------------------
-
- @Override
- public TypeInformation<E> getProducedType() {
- return TypeExtractor.getForClass(this.avroValueType);
- }
-
- // --------------------------------------------------------------------------------------------
- // Input Format Methods
- // --------------------------------------------------------------------------------------------
-
- @Override
- public void open(FileInputSplit split) throws IOException {
- super.open(split);
- dataFileReader = initReader(split);
- dataFileReader.sync(split.getStart());
- lastSync = dataFileReader.previousSync();
- }
-
- private DataFileReader<E> initReader(FileInputSplit split) throws IOException {
- DatumReader<E> datumReader;
-
- if (org.apache.avro.generic.GenericRecord.class == avroValueType) {
- datumReader = new GenericDatumReader<E>();
- } else {
- datumReader = org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroValueType)
- ? new SpecificDatumReader<E>(avroValueType) : new ReflectDatumReader<E>(avroValueType);
- }
- if (LOG.isInfoEnabled()) {
- LOG.info("Opening split {}", split);
- }
-
- SeekableInput in = new FSDataInputStreamWrapper(stream, split.getPath().getFileSystem().getFileStatus(split.getPath()).getLen());
- DataFileReader<E> dataFileReader = (DataFileReader) DataFileReader.openReader(in, datumReader);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Loaded SCHEMA: {}", dataFileReader.getSchema());
- }
-
- end = split.getStart() + split.getLength();
- recordsReadSinceLastSync = 0;
- return dataFileReader;
- }
-
- @Override
- public boolean reachedEnd() throws IOException {
- return !dataFileReader.hasNext() || dataFileReader.pastSync(end);
- }
-
- public long getRecordsReadFromBlock() {
- return this.recordsReadSinceLastSync;
- }
-
- @Override
- public E nextRecord(E reuseValue) throws IOException {
- if (reachedEnd()) {
- return null;
- }
-
- // if we start a new block, then register the event, and
- // restart the counter.
- if (dataFileReader.previousSync() != lastSync) {
- lastSync = dataFileReader.previousSync();
- recordsReadSinceLastSync = 0;
- }
- recordsReadSinceLastSync++;
-
- if (reuseAvroValue) {
- return dataFileReader.next(reuseValue);
- } else {
- if (GenericRecord.class == avroValueType) {
- return dataFileReader.next();
- } else {
- return dataFileReader.next(InstantiationUtil.instantiate(avroValueType, Object.class));
- }
- }
- }
-
- // --------------------------------------------------------------------------------------------
- // Checkpointing
- // --------------------------------------------------------------------------------------------
-
- @Override
- public Tuple2<Long, Long> getCurrentState() throws IOException {
- return new Tuple2<>(this.lastSync, this.recordsReadSinceLastSync);
- }
-
- @Override
- public void reopen(FileInputSplit split, Tuple2<Long, Long> state) throws IOException {
- Preconditions.checkNotNull(split, "reopen() cannot be called on a null split.");
- Preconditions.checkNotNull(state, "reopen() cannot be called with a null initial state.");
-
- try {
- this.open(split);
- } finally {
- if (state.f0 != -1) {
- lastSync = state.f0;
- recordsReadSinceLastSync = state.f1;
- }
- }
-
- if (lastSync != -1) {
- // open and read until the record we were before
- // the checkpoint and discard the values
- dataFileReader.seek(lastSync);
- for (int i = 0; i < recordsReadSinceLastSync; i++) {
- dataFileReader.next(null);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java b/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java
deleted file mode 100644
index 5da8f75..0000000
--- a/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java
+++ /dev/null
@@ -1,201 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.io;
-
-import org.apache.flink.api.common.io.FileOutputFormat;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.core.fs.Path;
-
-import org.apache.avro.Schema;
-import org.apache.avro.file.CodecFactory;
-import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.generic.GenericDatumWriter;
-import org.apache.avro.io.DatumWriter;
-import org.apache.avro.reflect.ReflectData;
-import org.apache.avro.reflect.ReflectDatumWriter;
-import org.apache.avro.specific.SpecificDatumWriter;
-
-import java.io.IOException;
-import java.io.Serializable;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * {@link FileOutputFormat} for Avro records.
- * @param <E>
- */
-public class AvroOutputFormat<E> extends FileOutputFormat<E> implements Serializable {
-
- /**
- * Wrapper which encapsulates the supported codec and a related serialization byte.
- */
- public enum Codec {
-
- NULL((byte) 0, CodecFactory.nullCodec()),
- SNAPPY((byte) 1, CodecFactory.snappyCodec()),
- BZIP2((byte) 2, CodecFactory.bzip2Codec()),
- DEFLATE((byte) 3, CodecFactory.deflateCodec(CodecFactory.DEFAULT_DEFLATE_LEVEL)),
- XZ((byte) 4, CodecFactory.xzCodec(CodecFactory.DEFAULT_XZ_LEVEL));
-
- private byte codecByte;
-
- private CodecFactory codecFactory;
-
- Codec(final byte codecByte, final CodecFactory codecFactory) {
- this.codecByte = codecByte;
- this.codecFactory = codecFactory;
- }
-
- private byte getCodecByte() {
- return codecByte;
- }
-
- private CodecFactory getCodecFactory() {
- return codecFactory;
- }
-
- private static Codec forCodecByte(byte codecByte) {
- for (final Codec codec : Codec.values()) {
- if (codec.getCodecByte() == codecByte) {
- return codec;
- }
- }
- throw new IllegalArgumentException("no codec for codecByte: " + codecByte);
- }
- }
-
- private static final long serialVersionUID = 1L;
-
- private final Class<E> avroValueType;
-
- private transient Schema userDefinedSchema = null;
-
- private transient Codec codec = null;
-
- private transient DataFileWriter<E> dataFileWriter;
-
- public AvroOutputFormat(Path filePath, Class<E> type) {
- super(filePath);
- this.avroValueType = type;
- }
-
- public AvroOutputFormat(Class<E> type) {
- this.avroValueType = type;
- }
-
- @Override
- protected String getDirectoryFileName(int taskNumber) {
- return super.getDirectoryFileName(taskNumber) + ".avro";
- }
-
- public void setSchema(Schema schema) {
- this.userDefinedSchema = schema;
- }
-
- /**
- * Set avro codec for compression.
- *
- * @param codec avro codec.
- */
- public void setCodec(final Codec codec) {
- this.codec = checkNotNull(codec, "codec can not be null");
- }
-
- @Override
- public void writeRecord(E record) throws IOException {
- dataFileWriter.append(record);
- }
-
- @Override
- public void open(int taskNumber, int numTasks) throws IOException {
- super.open(taskNumber, numTasks);
-
- DatumWriter<E> datumWriter;
- Schema schema;
- if (org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroValueType)) {
- datumWriter = new SpecificDatumWriter<E>(avroValueType);
- try {
- schema = ((org.apache.avro.specific.SpecificRecordBase) avroValueType.newInstance()).getSchema();
- } catch (InstantiationException | IllegalAccessException e) {
- throw new RuntimeException(e.getMessage());
- }
- } else if (org.apache.avro.generic.GenericRecord.class.isAssignableFrom(avroValueType)) {
- if (userDefinedSchema == null) {
- throw new IllegalStateException("Schema must be set when using Generic Record");
- }
- datumWriter = new GenericDatumWriter<E>(userDefinedSchema);
- schema = userDefinedSchema;
- } else {
- datumWriter = new ReflectDatumWriter<E>(avroValueType);
- schema = ReflectData.get().getSchema(avroValueType);
- }
- dataFileWriter = new DataFileWriter<E>(datumWriter);
- if (codec != null) {
- dataFileWriter.setCodec(codec.getCodecFactory());
- }
- if (userDefinedSchema == null) {
- dataFileWriter.create(schema, stream);
- } else {
- dataFileWriter.create(userDefinedSchema, stream);
- }
- }
-
- private void writeObject(java.io.ObjectOutputStream out) throws IOException {
- out.defaultWriteObject();
-
- if (codec != null) {
- out.writeByte(codec.getCodecByte());
- } else {
- out.writeByte(-1);
- }
-
- if (userDefinedSchema != null) {
- byte[] json = userDefinedSchema.toString().getBytes(ConfigConstants.DEFAULT_CHARSET);
- out.writeInt(json.length);
- out.write(json);
- } else {
- out.writeInt(0);
- }
- }
-
- private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException {
- in.defaultReadObject();
-
- byte codecByte = in.readByte();
- if (codecByte >= 0) {
- setCodec(Codec.forCodecByte(codecByte));
- }
-
- int length = in.readInt();
- if (length != 0) {
- byte[] json = new byte[length];
- in.readFully(json);
-
- Schema schema = new Schema.Parser().parse(new String(json, ConfigConstants.DEFAULT_CHARSET));
- setSchema(schema);
- }
- }
-
- @Override
- public void close() throws IOException {
- dataFileWriter.flush();
- dataFileWriter.close();
- super.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-connectors/flink-avro/src/test/assembly/test-assembly.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-avro/src/test/assembly/test-assembly.xml b/flink-connectors/flink-avro/src/test/assembly/test-assembly.xml
deleted file mode 100644
index 0cbdbe1..0000000
--- a/flink-connectors/flink-avro/src/test/assembly/test-assembly.xml
+++ /dev/null
@@ -1,36 +0,0 @@
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-<assembly>
- <id>test-jar</id>
- <formats>
- <format>jar</format>
- </formats>
- <includeBaseDirectory>false</includeBaseDirectory>
- <fileSets>
- <fileSet>
- <directory>${project.build.testOutputDirectory}</directory>
- <outputDirectory></outputDirectory>
- <!--modify/add include to match your package(s) -->
- <includes>
- <include>org/apache/flink/api/avro/testjar/**</include>
- </includes>
- </fileSet>
- </fileSets>
-</assembly>
http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
deleted file mode 100644
index 6133778..0000000
--- a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.avro;
-
-import org.apache.flink.api.avro.testjar.AvroExternalJarProgram;
-import org.apache.flink.client.program.PackagedProgram;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.JobManagerOptions;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
-import org.apache.flink.test.util.TestEnvironment;
-import org.apache.flink.util.TestLogger;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.File;
-import java.net.URL;
-import java.util.Collections;
-
-/**
- * IT case for the {@link AvroExternalJarProgram}.
- */
-public class AvroExternalJarProgramITCase extends TestLogger {
-
- private static final String JAR_FILE = "maven-test-jar.jar";
-
- private static final String TEST_DATA_FILE = "/testdata.avro";
-
- @Test
- public void testExternalProgram() {
-
- LocalFlinkMiniCluster testMiniCluster = null;
-
- try {
- int parallelism = 4;
- Configuration config = new Configuration();
- config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, parallelism);
- testMiniCluster = new LocalFlinkMiniCluster(config, false);
- testMiniCluster.start();
-
- String jarFile = JAR_FILE;
- String testData = getClass().getResource(TEST_DATA_FILE).toString();
-
- PackagedProgram program = new PackagedProgram(new File(jarFile), new String[] { testData });
-
- TestEnvironment.setAsContext(
- testMiniCluster,
- parallelism,
- Collections.singleton(new Path(jarFile)),
- Collections.<URL>emptyList());
-
- config.setString(JobManagerOptions.ADDRESS, "localhost");
- config.setInteger(JobManagerOptions.PORT, testMiniCluster.getLeaderRPCPort());
-
- program.invokeInteractiveModeForExecution();
- }
- catch (Throwable t) {
- System.err.println(t.getMessage());
- t.printStackTrace();
- Assert.fail("Error during the packaged program execution: " + t.getMessage());
- }
- finally {
- TestEnvironment.unsetAsContext();
-
- if (testMiniCluster != null) {
- try {
- testMiniCluster.stop();
- } catch (Throwable t) {
- // ignore
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatITCase.java b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatITCase.java
deleted file mode 100644
index f630f41..0000000
--- a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatITCase.java
+++ /dev/null
@@ -1,177 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.avro;
-
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.io.avro.example.User;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.AvroOutputFormat;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-import org.apache.avro.file.DataFileReader;
-import org.apache.avro.io.DatumReader;
-import org.apache.avro.reflect.ReflectDatumReader;
-import org.apache.avro.specific.SpecificDatumReader;
-import org.junit.Assert;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * IT cases for the {@link AvroOutputFormat}.
- */
-@SuppressWarnings("serial")
-public class AvroOutputFormatITCase extends JavaProgramTestBase {
-
- public static String outputPath1;
-
- public static String outputPath2;
-
- public static String inputPath;
-
- public static String userData = "alice|1|blue\n" +
- "bob|2|red\n" +
- "john|3|yellow\n" +
- "walt|4|black\n";
-
- @Override
- protected void preSubmit() throws Exception {
- inputPath = createTempFile("user", userData);
- outputPath1 = getTempDirPath("avro_output1");
- outputPath2 = getTempDirPath("avro_output2");
- }
-
- @Override
- protected void testProgram() throws Exception {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple3<String, Integer, String>> input = env.readCsvFile(inputPath)
- .fieldDelimiter("|")
- .types(String.class, Integer.class, String.class);
-
- //output the data with AvroOutputFormat for specific user type
- DataSet<User> specificUser = input.map(new ConvertToUser());
- AvroOutputFormat<User> avroOutputFormat = new AvroOutputFormat<User>(User.class);
- avroOutputFormat.setCodec(AvroOutputFormat.Codec.SNAPPY); // FLINK-4771: use a codec
- avroOutputFormat.setSchema(User.SCHEMA$); //FLINK-3304: Ensure the OF is properly serializing the schema
- specificUser.write(avroOutputFormat, outputPath1);
-
- //output the data with AvroOutputFormat for reflect user type
- DataSet<ReflectiveUser> reflectiveUser = specificUser.map(new ConvertToReflective());
- reflectiveUser.write(new AvroOutputFormat<ReflectiveUser>(ReflectiveUser.class), outputPath2);
-
- env.execute();
- }
-
- @Override
- protected void postSubmit() throws Exception {
- //compare result for specific user type
- File [] output1;
- File file1 = asFile(outputPath1);
- if (file1.isDirectory()) {
- output1 = file1.listFiles();
- // check for avro ext in dir.
- for (File avroOutput : output1) {
- Assert.assertTrue("Expect extension '.avro'", avroOutput.toString().endsWith(".avro"));
- }
- } else {
- output1 = new File[] {file1};
- }
- List<String> result1 = new ArrayList<String>();
- DatumReader<User> userDatumReader1 = new SpecificDatumReader<User>(User.class);
- for (File avroOutput : output1) {
-
- DataFileReader<User> dataFileReader1 = new DataFileReader<User>(avroOutput, userDatumReader1);
- while (dataFileReader1.hasNext()) {
- User user = dataFileReader1.next();
- result1.add(user.getName() + "|" + user.getFavoriteNumber() + "|" + user.getFavoriteColor());
- }
- }
- for (String expectedResult : userData.split("\n")) {
- Assert.assertTrue("expected user " + expectedResult + " not found.", result1.contains(expectedResult));
- }
-
- //compare result for reflect user type
- File [] output2;
- File file2 = asFile(outputPath2);
- if (file2.isDirectory()) {
- output2 = file2.listFiles();
- } else {
- output2 = new File[] {file2};
- }
- List<String> result2 = new ArrayList<String>();
- DatumReader<ReflectiveUser> userDatumReader2 = new ReflectDatumReader<ReflectiveUser>(ReflectiveUser.class);
- for (File avroOutput : output2) {
- DataFileReader<ReflectiveUser> dataFileReader2 = new DataFileReader<ReflectiveUser>(avroOutput, userDatumReader2);
- while (dataFileReader2.hasNext()) {
- ReflectiveUser user = dataFileReader2.next();
- result2.add(user.getName() + "|" + user.getFavoriteNumber() + "|" + user.getFavoriteColor());
- }
- }
- for (String expectedResult : userData.split("\n")) {
- Assert.assertTrue("expected user " + expectedResult + " not found.", result2.contains(expectedResult));
- }
-
- }
-
- private static final class ConvertToUser extends RichMapFunction<Tuple3<String, Integer, String>, User> {
-
- @Override
- public User map(Tuple3<String, Integer, String> value) throws Exception {
- return new User(value.f0, value.f1, value.f2);
- }
- }
-
- private static final class ConvertToReflective extends RichMapFunction<User, ReflectiveUser> {
-
- @Override
- public ReflectiveUser map(User value) throws Exception {
- return new ReflectiveUser(value.getName().toString(), value.getFavoriteNumber(), value.getFavoriteColor().toString());
- }
- }
-
- private static class ReflectiveUser {
- private String name;
- private int favoriteNumber;
- private String favoriteColor;
-
- public ReflectiveUser() {}
-
- public ReflectiveUser(String name, int favoriteNumber, String favoriteColor) {
- this.name = name;
- this.favoriteNumber = favoriteNumber;
- this.favoriteColor = favoriteColor;
- }
-
- public String getName() {
- return this.name;
- }
-
- public String getFavoriteColor() {
- return this.favoriteColor;
- }
-
- public int getFavoriteNumber() {
- return this.favoriteNumber;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java
deleted file mode 100644
index 808c257..0000000
--- a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java
+++ /dev/null
@@ -1,529 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.avro;
-
-import org.apache.flink.api.io.avro.generated.Address;
-import org.apache.flink.api.io.avro.generated.Colors;
-import org.apache.flink.api.io.avro.generated.Fixed16;
-import org.apache.flink.api.io.avro.generated.User;
-import org.apache.flink.util.StringUtils;
-
-import org.apache.avro.reflect.ReflectDatumReader;
-import org.apache.avro.reflect.ReflectDatumWriter;
-import org.junit.Test;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-/**
- * Tests the {@link DataOutputEncoder} and {@link DataInputDecoder} classes for Avro serialization.
- */
-public class EncoderDecoderTest {
- @Test
- public void testComplexStringsDirecty() {
- try {
- Random rnd = new Random(349712539451944123L);
-
- for (int i = 0; i < 10; i++) {
- String testString = StringUtils.getRandomString(rnd, 10, 100);
-
- ByteArrayOutputStream baos = new ByteArrayOutputStream(512);
- {
- DataOutputStream dataOut = new DataOutputStream(baos);
- DataOutputEncoder encoder = new DataOutputEncoder();
- encoder.setOut(dataOut);
-
- encoder.writeString(testString);
- dataOut.flush();
- dataOut.close();
- }
-
- byte[] data = baos.toByteArray();
-
- // deserialize
- {
- ByteArrayInputStream bais = new ByteArrayInputStream(data);
- DataInputStream dataIn = new DataInputStream(bais);
- DataInputDecoder decoder = new DataInputDecoder();
- decoder.setIn(dataIn);
-
- String deserialized = decoder.readString();
-
- assertEquals(testString, deserialized);
- }
- }
- }
- catch (Exception e) {
- System.err.println(e.getMessage());
- e.printStackTrace();
- fail("Test failed due to an exception: " + e.getMessage());
- }
- }
-
- @Test
- public void testPrimitiveTypes() {
-
- testObjectSerialization(new Boolean(true));
- testObjectSerialization(new Boolean(false));
-
- testObjectSerialization(Byte.valueOf((byte) 0));
- testObjectSerialization(Byte.valueOf((byte) 1));
- testObjectSerialization(Byte.valueOf((byte) -1));
- testObjectSerialization(Byte.valueOf(Byte.MIN_VALUE));
- testObjectSerialization(Byte.valueOf(Byte.MAX_VALUE));
-
- testObjectSerialization(Short.valueOf((short) 0));
- testObjectSerialization(Short.valueOf((short) 1));
- testObjectSerialization(Short.valueOf((short) -1));
- testObjectSerialization(Short.valueOf(Short.MIN_VALUE));
- testObjectSerialization(Short.valueOf(Short.MAX_VALUE));
-
- testObjectSerialization(Integer.valueOf(0));
- testObjectSerialization(Integer.valueOf(1));
- testObjectSerialization(Integer.valueOf(-1));
- testObjectSerialization(Integer.valueOf(Integer.MIN_VALUE));
- testObjectSerialization(Integer.valueOf(Integer.MAX_VALUE));
-
- testObjectSerialization(Long.valueOf(0));
- testObjectSerialization(Long.valueOf(1));
- testObjectSerialization(Long.valueOf(-1));
- testObjectSerialization(Long.valueOf(Long.MIN_VALUE));
- testObjectSerialization(Long.valueOf(Long.MAX_VALUE));
-
- testObjectSerialization(Float.valueOf(0));
- testObjectSerialization(Float.valueOf(1));
- testObjectSerialization(Float.valueOf(-1));
- testObjectSerialization(Float.valueOf((float) Math.E));
- testObjectSerialization(Float.valueOf((float) Math.PI));
- testObjectSerialization(Float.valueOf(Float.MIN_VALUE));
- testObjectSerialization(Float.valueOf(Float.MAX_VALUE));
- testObjectSerialization(Float.valueOf(Float.MIN_NORMAL));
- testObjectSerialization(Float.valueOf(Float.NaN));
- testObjectSerialization(Float.valueOf(Float.NEGATIVE_INFINITY));
- testObjectSerialization(Float.valueOf(Float.POSITIVE_INFINITY));
-
- testObjectSerialization(Double.valueOf(0));
- testObjectSerialization(Double.valueOf(1));
- testObjectSerialization(Double.valueOf(-1));
- testObjectSerialization(Double.valueOf(Math.E));
- testObjectSerialization(Double.valueOf(Math.PI));
- testObjectSerialization(Double.valueOf(Double.MIN_VALUE));
- testObjectSerialization(Double.valueOf(Double.MAX_VALUE));
- testObjectSerialization(Double.valueOf(Double.MIN_NORMAL));
- testObjectSerialization(Double.valueOf(Double.NaN));
- testObjectSerialization(Double.valueOf(Double.NEGATIVE_INFINITY));
- testObjectSerialization(Double.valueOf(Double.POSITIVE_INFINITY));
-
- testObjectSerialization("");
- testObjectSerialization("abcdefg");
- testObjectSerialization("ab\u1535\u0155xyz\u706F");
-
- testObjectSerialization(new SimpleTypes(3637, 54876486548L, (byte) 65, "We're out looking for astronauts", (short) 0x2387, 2.65767523));
- testObjectSerialization(new SimpleTypes(705608724, -1L, (byte) -65, "Serve me the sky with a big slice of lemon", (short) Byte.MIN_VALUE, 0.0000001));
- }
-
- @Test
- public void testArrayTypes() {
- {
- int[] array = new int[] {1, 2, 3, 4, 5};
- testObjectSerialization(array);
- }
- {
- long[] array = new long[] {1, 2, 3, 4, 5};
- testObjectSerialization(array);
- }
- {
- float[] array = new float[] {1, 2, 3, 4, 5};
- testObjectSerialization(array);
- }
- {
- double[] array = new double[] {1, 2, 3, 4, 5};
- testObjectSerialization(array);
- }
- {
- String[] array = new String[] {"Oh", "my", "what", "do", "we", "have", "here", "?"};
- testObjectSerialization(array);
- }
- }
-
- @Test
- public void testEmptyArray() {
- {
- int[] array = new int[0];
- testObjectSerialization(array);
- }
- {
- long[] array = new long[0];
- testObjectSerialization(array);
- }
- {
- float[] array = new float[0];
- testObjectSerialization(array);
- }
- {
- double[] array = new double[0];
- testObjectSerialization(array);
- }
- {
- String[] array = new String[0];
- testObjectSerialization(array);
- }
- }
-
- @Test
- public void testObjects() {
- // simple object containing only primitives
- {
- testObjectSerialization(new Book(976243875L, "The Serialization Odysse", 42));
- }
-
- // object with collection
- {
- ArrayList<String> list = new ArrayList<String>();
- list.add("A");
- list.add("B");
- list.add("C");
- list.add("D");
- list.add("E");
-
- testObjectSerialization(new BookAuthor(976243875L, list, "Arno Nym"));
- }
-
- // object with empty collection
- {
- ArrayList<String> list = new ArrayList<String>();
- testObjectSerialization(new BookAuthor(987654321L, list, "The Saurus"));
- }
- }
-
- @Test
- public void testNestedObjectsWithCollections() {
- testObjectSerialization(new ComplexNestedObject2(true));
- }
-
- @Test
- public void testGeneratedObjectWithNullableFields() {
- List<CharSequence> strings = Arrays.asList(new CharSequence[] { "These", "strings", "should", "be", "recognizable", "as", "a", "meaningful", "sequence" });
- List<Boolean> bools = Arrays.asList(true, true, false, false, true, false, true, true);
- Map<CharSequence, Long> map = new HashMap<CharSequence, Long>();
- map.put("1", 1L);
- map.put("2", 2L);
- map.put("3", 3L);
-
- byte[] b = new byte[16];
- new Random().nextBytes(b);
- Fixed16 f = new Fixed16(b);
- Address addr = new Address(new Integer(239), "6th Main", "Bangalore",
- "Karnataka", "560075");
- User user = new User("Freudenreich", 1337, "macintosh gray",
- 1234567890L, 3.1415926, null, true, strings, bools, null,
- Colors.GREEN, map, f, new Boolean(true), addr);
-
- testObjectSerialization(user);
- }
-
- @Test
- public void testVarLenCountEncoding() {
- try {
- long[] values = new long[] { 0, 1, 2, 3, 4, 0, 574, 45236, 0, 234623462, 23462462346L, 0, 9734028767869761L, 0x7fffffffffffffffL};
-
- // write
- ByteArrayOutputStream baos = new ByteArrayOutputStream(512);
- {
- DataOutputStream dataOut = new DataOutputStream(baos);
-
- for (long val : values) {
- DataOutputEncoder.writeVarLongCount(dataOut, val);
- }
-
- dataOut.flush();
- dataOut.close();
- }
-
- // read
- {
- ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
- DataInputStream dataIn = new DataInputStream(bais);
-
- for (long val : values) {
- long read = DataInputDecoder.readVarLongCount(dataIn);
- assertEquals("Wrong var-len encoded value read.", val, read);
- }
- }
- }
- catch (Exception e) {
- System.err.println(e.getMessage());
- e.printStackTrace();
- fail("Test failed due to an exception: " + e.getMessage());
- }
- }
-
- private static <X> void testObjectSerialization(X obj) {
-
- try {
-
- // serialize
- ByteArrayOutputStream baos = new ByteArrayOutputStream(512);
- {
- DataOutputStream dataOut = new DataOutputStream(baos);
- DataOutputEncoder encoder = new DataOutputEncoder();
- encoder.setOut(dataOut);
-
- @SuppressWarnings("unchecked")
- Class<X> clazz = (Class<X>) obj.getClass();
- ReflectDatumWriter<X> writer = new ReflectDatumWriter<X>(clazz);
-
- writer.write(obj, encoder);
- dataOut.flush();
- dataOut.close();
- }
-
- byte[] data = baos.toByteArray();
- X result = null;
-
- // deserialize
- {
- ByteArrayInputStream bais = new ByteArrayInputStream(data);
- DataInputStream dataIn = new DataInputStream(bais);
- DataInputDecoder decoder = new DataInputDecoder();
- decoder.setIn(dataIn);
-
- @SuppressWarnings("unchecked")
- Class<X> clazz = (Class<X>) obj.getClass();
- ReflectDatumReader<X> reader = new ReflectDatumReader<X>(clazz);
-
- // create a reuse object if possible, otherwise we have no reuse object
- X reuse = null;
- try {
- @SuppressWarnings("unchecked")
- X test = (X) obj.getClass().newInstance();
- reuse = test;
- } catch (Throwable t) {}
-
- result = reader.read(reuse, decoder);
- }
-
- // check
- final String message = "Deserialized object is not the same as the original";
-
- if (obj.getClass().isArray()) {
- Class<?> clazz = obj.getClass();
- if (clazz == byte[].class) {
- assertArrayEquals(message, (byte[]) obj, (byte[]) result);
- }
- else if (clazz == short[].class) {
- assertArrayEquals(message, (short[]) obj, (short[]) result);
- }
- else if (clazz == int[].class) {
- assertArrayEquals(message, (int[]) obj, (int[]) result);
- }
- else if (clazz == long[].class) {
- assertArrayEquals(message, (long[]) obj, (long[]) result);
- }
- else if (clazz == char[].class) {
- assertArrayEquals(message, (char[]) obj, (char[]) result);
- }
- else if (clazz == float[].class) {
- assertArrayEquals(message, (float[]) obj, (float[]) result, 0.0f);
- }
- else if (clazz == double[].class) {
- assertArrayEquals(message, (double[]) obj, (double[]) result, 0.0);
- } else {
- assertArrayEquals(message, (Object[]) obj, (Object[]) result);
- }
- } else {
- assertEquals(message, obj, result);
- }
- }
- catch (Exception e) {
- System.err.println(e.getMessage());
- e.printStackTrace();
- fail("Test failed due to an exception: " + e.getMessage());
- }
- }
-
- // --------------------------------------------------------------------------------------------
- // Test Objects
- // --------------------------------------------------------------------------------------------
-
- private static final class SimpleTypes {
-
- private final int iVal;
- private final long lVal;
- private final byte bVal;
- private final String sVal;
- private final short rVal;
- private final double dVal;
-
- public SimpleTypes() {
- this(0, 0, (byte) 0, "", (short) 0, 0);
- }
-
- public SimpleTypes(int iVal, long lVal, byte bVal, String sVal, short rVal, double dVal) {
- this.iVal = iVal;
- this.lVal = lVal;
- this.bVal = bVal;
- this.sVal = sVal;
- this.rVal = rVal;
- this.dVal = dVal;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj.getClass() == SimpleTypes.class) {
- SimpleTypes other = (SimpleTypes) obj;
-
- return other.iVal == this.iVal &&
- other.lVal == this.lVal &&
- other.bVal == this.bVal &&
- other.sVal.equals(this.sVal) &&
- other.rVal == this.rVal &&
- other.dVal == this.dVal;
-
- } else {
- return false;
- }
- }
- }
-
- private static class ComplexNestedObject1 {
-
- private double doubleValue;
-
- private List<String> stringList;
-
- public ComplexNestedObject1() {}
-
- public ComplexNestedObject1(int offInit) {
- this.doubleValue = 6293485.6723 + offInit;
-
- this.stringList = new ArrayList<String>();
- this.stringList.add("A" + offInit);
- this.stringList.add("somewhat" + offInit);
- this.stringList.add("random" + offInit);
- this.stringList.add("collection" + offInit);
- this.stringList.add("of" + offInit);
- this.stringList.add("strings" + offInit);
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj.getClass() == ComplexNestedObject1.class) {
- ComplexNestedObject1 other = (ComplexNestedObject1) obj;
- return other.doubleValue == this.doubleValue && this.stringList.equals(other.stringList);
- } else {
- return false;
- }
- }
- }
-
- private static class ComplexNestedObject2 {
-
- private long longValue;
-
- private Map<String, ComplexNestedObject1> theMap;
-
- public ComplexNestedObject2() {}
-
- public ComplexNestedObject2(boolean init) {
- this.longValue = 46547;
-
- this.theMap = new HashMap<String, ComplexNestedObject1>();
- this.theMap.put("36354L", new ComplexNestedObject1(43546543));
- this.theMap.put("785611L", new ComplexNestedObject1(45784568));
- this.theMap.put("43L", new ComplexNestedObject1(9876543));
- this.theMap.put("-45687L", new ComplexNestedObject1(7897615));
- this.theMap.put("1919876876896L", new ComplexNestedObject1(27154));
- this.theMap.put("-868468468L", new ComplexNestedObject1(546435));
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj.getClass() == ComplexNestedObject2.class) {
- ComplexNestedObject2 other = (ComplexNestedObject2) obj;
- return other.longValue == this.longValue && this.theMap.equals(other.theMap);
- } else {
- return false;
- }
- }
- }
-
- private static class Book {
-
- private long bookId;
- private String title;
- private long authorId;
-
- public Book() {}
-
- public Book(long bookId, String title, long authorId) {
- this.bookId = bookId;
- this.title = title;
- this.authorId = authorId;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj.getClass() == Book.class) {
- Book other = (Book) obj;
- return other.bookId == this.bookId && other.authorId == this.authorId && this.title.equals(other.title);
- } else {
- return false;
- }
- }
- }
-
- private static class BookAuthor {
-
- private long authorId;
- private List<String> bookTitles;
- private String authorName;
-
- public BookAuthor() {}
-
- public BookAuthor(long authorId, List<String> bookTitles, String authorName) {
- this.authorId = authorId;
- this.bookTitles = bookTitles;
- this.authorName = authorName;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj.getClass() == BookAuthor.class) {
- BookAuthor other = (BookAuthor) obj;
- return other.authorName.equals(this.authorName) && other.authorId == this.authorId &&
- other.bookTitles.equals(this.bookTitles);
- } else {
- return false;
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java
deleted file mode 100644
index a8541b6..0000000
--- a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java
+++ /dev/null
@@ -1,211 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.avro.testjar;
-
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.common.functions.RichReduceFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.AvroInputFormat;
-import org.apache.flink.api.java.io.DiscardingOutputFormat;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.core.fs.Path;
-
-import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.io.DatumWriter;
-import org.apache.avro.reflect.ReflectData;
-import org.apache.avro.reflect.ReflectDatumWriter;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-
-/**
- * This file defines the classes for the AvroExternalJarProgramITCase.
- */
-public class AvroExternalJarProgram {
-
- private static final class Color {
-
- private String name;
- private double saturation;
-
- public Color() {
- name = "";
- saturation = 1.0;
- }
-
- public Color(String name, double saturation) {
- this.name = name;
- this.saturation = saturation;
- }
-
- public String getName() {
- return name;
- }
-
- public void setName(String name) {
- this.name = name;
- }
-
- public double getSaturation() {
- return saturation;
- }
-
- public void setSaturation(double saturation) {
- this.saturation = saturation;
- }
-
- @Override
- public String toString() {
- return name + '(' + saturation + ')';
- }
- }
-
- private static final class MyUser {
-
- private String name;
- private List<Color> colors;
-
- public MyUser() {
- name = "unknown";
- colors = new ArrayList<Color>();
- }
-
- public MyUser(String name, List<Color> colors) {
- this.name = name;
- this.colors = colors;
- }
-
- public String getName() {
- return name;
- }
-
- public List<Color> getColors() {
- return colors;
- }
-
- public void setName(String name) {
- this.name = name;
- }
-
- public void setColors(List<Color> colors) {
- this.colors = colors;
- }
-
- @Override
- public String toString() {
- return name + " : " + colors;
- }
- }
-
- // --------------------------------------------------------------------------------------------
-
- // --------------------------------------------------------------------------------------------
-
- private static final class NameExtractor extends RichMapFunction<MyUser, Tuple2<String, MyUser>> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Tuple2<String, MyUser> map(MyUser u) {
- String namePrefix = u.getName().substring(0, 1);
- return new Tuple2<String, MyUser>(namePrefix, u);
- }
- }
-
- private static final class NameGrouper extends RichReduceFunction<Tuple2<String, MyUser>> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Tuple2<String, MyUser> reduce(Tuple2<String, MyUser> val1, Tuple2<String, MyUser> val2) {
- return val1;
- }
- }
-
- // --------------------------------------------------------------------------------------------
- // Test Data
- // --------------------------------------------------------------------------------------------
-
- private static final class Generator {
-
- private final Random rnd = new Random(2389756789345689276L);
-
- public MyUser nextUser() {
- return randomUser();
- }
-
- private MyUser randomUser() {
-
- int numColors = rnd.nextInt(5);
- ArrayList<Color> colors = new ArrayList<Color>(numColors);
- for (int i = 0; i < numColors; i++) {
- colors.add(new Color(randomString(), rnd.nextDouble()));
- }
-
- return new MyUser(randomString(), colors);
- }
-
- private String randomString() {
- char[] c = new char[this.rnd.nextInt(20) + 5];
-
- for (int i = 0; i < c.length; i++) {
- c[i] = (char) (this.rnd.nextInt(150) + 40);
- }
-
- return new String(c);
- }
- }
-
- public static void writeTestData(File testFile, int numRecords) throws IOException {
-
- DatumWriter<MyUser> userDatumWriter = new ReflectDatumWriter<MyUser>(MyUser.class);
- DataFileWriter<MyUser> dataFileWriter = new DataFileWriter<MyUser>(userDatumWriter);
-
- dataFileWriter.create(ReflectData.get().getSchema(MyUser.class), testFile);
-
- Generator generator = new Generator();
-
- for (int i = 0; i < numRecords; i++) {
- MyUser user = generator.nextUser();
- dataFileWriter.append(user);
- }
-
- dataFileWriter.close();
- }
-
-// public static void main(String[] args) throws Exception {
-// String testDataFile = new File("src/test/resources/testdata.avro").getAbsolutePath();
-// writeTestData(new File(testDataFile), 50);
-// }
-
- public static void main(String[] args) throws Exception {
- String inputPath = args[0];
-
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<MyUser> input = env.createInput(new AvroInputFormat<MyUser>(new Path(inputPath), MyUser.class));
-
- DataSet<Tuple2<String, MyUser>> result = input.map(new NameExtractor()).groupBy(0).reduce(new NameGrouper());
-
- result.output(new DiscardingOutputFormat<Tuple2<String, MyUser>>());
- env.execute();
- }
-}