You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2014/07/22 12:41:13 UTC
[73/92] [abbrv] git commit: prefix all projects in addons and
quickstarts with flink-
prefix all projects in addons and quickstarts with flink-
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/4771efc2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/4771efc2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/4771efc2
Branch: refs/heads/travis_test
Commit: 4771efc2d2c0c9f210d343cd71c228401fe096e6
Parents: fbc9338
Author: Robert Metzger <rm...@apache.org>
Authored: Fri Jul 11 19:28:16 2014 +0200
Committer: Robert Metzger <rm...@apache.org>
Committed: Fri Jul 11 20:02:50 2014 +0200
----------------------------------------------------------------------
docs/cluster_setup.md | 2 +-
docs/java_api_guide.md | 2 +-
docs/java_api_quickstart.md | 2 +-
docs/scala_api_quickstart.md | 2 +-
flink-addons/avro/pom.xml | 158 ----
.../apache/flink/api/avro/AvroBaseValue.java | 153 ----
.../apache/flink/api/avro/DataInputDecoder.java | 213 ------
.../flink/api/avro/DataOutputEncoder.java | 183 -----
.../api/avro/FSDataInputStreamWrapper.java | 68 --
.../flink/api/java/io/AvroInputFormat.java | 124 ---
.../flink/api/java/io/AvroOutputFormat.java | 95 ---
.../java/record/io/avro/AvroInputFormat.java | 111 ---
.../record/io/avro/AvroRecordInputFormat.java | 374 ---------
.../avro/example/ReflectiveAvroTypeExample.java | 161 ----
.../api/java/record/io/avro/example/SUser.java | 25 -
.../api/java/record/io/avro/example/User.java | 269 -------
.../avro/src/test/assembly/test-assembly.xml | 31 -
.../api/avro/AvroExternalJarProgramITCase.java | 77 --
.../flink/api/avro/AvroOutputFormatTest.java | 168 -----
.../api/avro/AvroWithEmptyArrayITCase.java | 218 ------
.../flink/api/avro/EncoderDecoderTest.java | 523 -------------
.../avro/testjar/AvroExternalJarProgram.java | 232 ------
.../io/AvroInputFormatTypeExtractionTest.java | 81 --
.../io/avro/AvroRecordInputFormatTest.java | 169 -----
.../java/record/io/avro/generated/Colors.java | 32 -
.../api/java/record/io/avro/generated/User.java | 755 -------------------
.../avro/src/test/resources/avro/user.avsc | 19 -
.../avro/src/test/resources/testdata.avro | Bin 4572 -> 0 bytes
flink-addons/flink-avro/pom.xml | 158 ++++
.../apache/flink/api/avro/AvroBaseValue.java | 153 ++++
.../apache/flink/api/avro/DataInputDecoder.java | 213 ++++++
.../flink/api/avro/DataOutputEncoder.java | 183 +++++
.../api/avro/FSDataInputStreamWrapper.java | 68 ++
.../flink/api/java/io/AvroInputFormat.java | 124 +++
.../flink/api/java/io/AvroOutputFormat.java | 95 +++
.../java/record/io/avro/AvroInputFormat.java | 111 +++
.../record/io/avro/AvroRecordInputFormat.java | 374 +++++++++
.../avro/example/ReflectiveAvroTypeExample.java | 161 ++++
.../api/java/record/io/avro/example/SUser.java | 25 +
.../api/java/record/io/avro/example/User.java | 269 +++++++
.../src/test/assembly/test-assembly.xml | 31 +
.../api/avro/AvroExternalJarProgramITCase.java | 77 ++
.../flink/api/avro/AvroOutputFormatTest.java | 168 +++++
.../api/avro/AvroWithEmptyArrayITCase.java | 218 ++++++
.../flink/api/avro/EncoderDecoderTest.java | 523 +++++++++++++
.../avro/testjar/AvroExternalJarProgram.java | 232 ++++++
.../io/AvroInputFormatTypeExtractionTest.java | 81 ++
.../io/avro/AvroRecordInputFormatTest.java | 169 +++++
.../java/record/io/avro/generated/Colors.java | 32 +
.../api/java/record/io/avro/generated/User.java | 755 +++++++++++++++++++
.../src/test/resources/avro/user.avsc | 19 +
.../flink-avro/src/test/resources/testdata.avro | Bin 0 -> 4572 bytes
flink-addons/flink-hadoop-compatibility/pom.xml | 77 ++
.../mapred/HadoopInputFormat.java | 291 +++++++
.../mapred/HadoopOutputFormat.java | 168 +++++
.../mapred/example/WordCount.java | 120 +++
.../mapred/record/HadoopDataSink.java | 107 +++
.../mapred/record/HadoopDataSource.java | 86 +++
.../mapred/record/HadoopRecordInputFormat.java | 172 +++++
.../mapred/record/HadoopRecordOutputFormat.java | 156 ++++
.../datatypes/DefaultFlinkTypeConverter.java | 95 +++
.../datatypes/DefaultHadoopTypeConverter.java | 83 ++
.../record/datatypes/FlinkTypeConverter.java | 43 ++
.../datatypes/HadoopFileOutputCommitter.java | 196 +++++
.../record/datatypes/HadoopTypeConverter.java | 42 ++
.../datatypes/WritableComparableWrapper.java | 40 +
.../record/datatypes/WritableWrapper.java | 71 ++
.../datatypes/WritableWrapperConverter.java | 45 ++
.../mapred/record/example/WordCount.java | 184 +++++
.../example/WordCountWithOutputFormat.java | 173 +++++
.../mapred/utils/HadoopUtils.java | 87 +++
.../mapred/wrapper/HadoopDummyProgressable.java | 33 +
.../mapred/wrapper/HadoopDummyReporter.java | 71 ++
.../mapred/wrapper/HadoopInputSplit.java | 92 +++
.../mapreduce/HadoopInputFormat.java | 337 +++++++++
.../mapreduce/HadoopOutputFormat.java | 207 +++++
.../mapreduce/example/WordCount.java | 121 +++
.../mapreduce/utils/HadoopUtils.java | 83 ++
.../mapreduce/wrapper/HadoopInputSplit.java | 90 +++
.../mapred/HadoopInputOutputITCase.java | 46 ++
.../record/HadoopRecordInputOutputITCase.java | 54 ++
.../mapreduce/HadoopInputOutputITCase.java | 46 ++
flink-addons/flink-hbase/pom.xml | 111 +++
.../addons/hbase/GenericTableOutputFormat.java | 116 +++
.../flink/addons/hbase/HBaseDataSink.java | 47 ++
.../flink/addons/hbase/TableInputFormat.java | 407 ++++++++++
.../flink/addons/hbase/TableInputSplit.java | 168 +++++
.../flink/addons/hbase/common/HBaseKey.java | 87 +++
.../flink/addons/hbase/common/HBaseResult.java | 69 ++
.../flink/addons/hbase/common/HBaseUtil.java | 68 ++
.../addons/hbase/example/HBaseReadExample.java | 129 ++++
flink-addons/flink-jdbc/pom.xml | 59 ++
.../flink/api/java/io/jdbc/JDBCInputFormat.java | 356 +++++++++
.../api/java/io/jdbc/JDBCOutputFormat.java | 274 +++++++
.../api/java/io/jdbc/example/JDBCExample.java | 101 +++
.../java/record/io/jdbc/JDBCInputFormat.java | 389 ++++++++++
.../java/record/io/jdbc/JDBCOutputFormat.java | 353 +++++++++
.../record/io/jdbc/example/JDBCExample.java | 136 ++++
.../api/java/io/jdbc/JDBCInputFormatTest.java | 196 +++++
.../api/java/io/jdbc/JDBCOutputFormatTest.java | 260 +++++++
.../java/record/io/jdbc/DevNullLogStream.java | 30 +
.../record/io/jdbc/JDBCInputFormatTest.java | 210 ++++++
.../record/io/jdbc/JDBCOutputFormatTest.java | 227 ++++++
flink-addons/flink-spargel/pom.xml | 55 ++
.../flink/spargel/java/MessageIterator.java | 58 ++
.../flink/spargel/java/MessagingFunction.java | 284 +++++++
.../apache/flink/spargel/java/OutgoingEdge.java | 64 ++
.../spargel/java/VertexCentricIteration.java | 567 ++++++++++++++
.../spargel/java/VertexUpdateFunction.java | 145 ++++
.../examples/SpargelConnectedComponents.java | 79 ++
.../spargel/java/examples/SpargelPageRank.java | 117 +++
.../SpargelPageRankCountingVertices.java | 153 ++++
.../apache/flink/spargel/java/record/Edge.java | 43 ++
.../spargel/java/record/MessageIterator.java | 59 ++
.../spargel/java/record/MessagingFunction.java | 163 ++++
.../spargel/java/record/SpargelIteration.java | 280 +++++++
.../java/record/VertexUpdateFunction.java | 90 +++
.../flink/spargel/java/SpargelCompilerTest.java | 183 +++++
.../spargel/java/SpargelTranslationTest.java | 215 ++++++
.../SpargelConnectedComponentsITCase.java | 81 ++
flink-addons/flink-yarn/pom.xml | 60 ++
.../apache/flink/yarn/ApplicationMaster.java | 323 ++++++++
.../main/java/org/apache/flink/yarn/Client.java | 633 ++++++++++++++++
.../main/java/org/apache/flink/yarn/Utils.java | 266 +++++++
.../flink/yarn/YarnTaskManagerRunner.java | 68 ++
flink-addons/hadoop-compatibility/pom.xml | 77 --
.../mapred/HadoopInputFormat.java | 291 -------
.../mapred/HadoopOutputFormat.java | 168 -----
.../mapred/example/WordCount.java | 120 ---
.../mapred/record/HadoopDataSink.java | 107 ---
.../mapred/record/HadoopDataSource.java | 86 ---
.../mapred/record/HadoopRecordInputFormat.java | 172 -----
.../mapred/record/HadoopRecordOutputFormat.java | 156 ----
.../datatypes/DefaultFlinkTypeConverter.java | 95 ---
.../datatypes/DefaultHadoopTypeConverter.java | 83 --
.../record/datatypes/FlinkTypeConverter.java | 43 --
.../datatypes/HadoopFileOutputCommitter.java | 196 -----
.../record/datatypes/HadoopTypeConverter.java | 42 --
.../datatypes/WritableComparableWrapper.java | 40 -
.../record/datatypes/WritableWrapper.java | 71 --
.../datatypes/WritableWrapperConverter.java | 45 --
.../mapred/record/example/WordCount.java | 184 -----
.../example/WordCountWithOutputFormat.java | 173 -----
.../mapred/utils/HadoopUtils.java | 87 ---
.../mapred/wrapper/HadoopDummyProgressable.java | 33 -
.../mapred/wrapper/HadoopDummyReporter.java | 71 --
.../mapred/wrapper/HadoopInputSplit.java | 92 ---
.../mapreduce/HadoopInputFormat.java | 337 ---------
.../mapreduce/HadoopOutputFormat.java | 207 -----
.../mapreduce/example/WordCount.java | 121 ---
.../mapreduce/utils/HadoopUtils.java | 83 --
.../mapreduce/wrapper/HadoopInputSplit.java | 90 ---
.../mapred/HadoopInputOutputITCase.java | 46 --
.../record/HadoopRecordInputOutputITCase.java | 54 --
.../mapreduce/HadoopInputOutputITCase.java | 46 --
flink-addons/hbase/pom.xml | 111 ---
.../addons/hbase/GenericTableOutputFormat.java | 116 ---
.../flink/addons/hbase/HBaseDataSink.java | 47 --
.../flink/addons/hbase/TableInputFormat.java | 407 ----------
.../flink/addons/hbase/TableInputSplit.java | 168 -----
.../flink/addons/hbase/common/HBaseKey.java | 87 ---
.../flink/addons/hbase/common/HBaseResult.java | 69 --
.../flink/addons/hbase/common/HBaseUtil.java | 68 --
.../addons/hbase/example/HBaseReadExample.java | 129 ----
flink-addons/jdbc/pom.xml | 59 --
.../flink/api/java/io/jdbc/JDBCInputFormat.java | 356 ---------
.../api/java/io/jdbc/JDBCOutputFormat.java | 274 -------
.../api/java/io/jdbc/example/JDBCExample.java | 101 ---
.../java/record/io/jdbc/JDBCInputFormat.java | 389 ----------
.../java/record/io/jdbc/JDBCOutputFormat.java | 353 ---------
.../record/io/jdbc/example/JDBCExample.java | 136 ----
.../api/java/io/jdbc/JDBCInputFormatTest.java | 196 -----
.../api/java/io/jdbc/JDBCOutputFormatTest.java | 260 -------
.../java/record/io/jdbc/DevNullLogStream.java | 30 -
.../record/io/jdbc/JDBCInputFormatTest.java | 210 ------
.../record/io/jdbc/JDBCOutputFormatTest.java | 227 ------
flink-addons/pom.xml | 12 +-
flink-addons/spargel/pom.xml | 55 --
.../flink/spargel/java/MessageIterator.java | 58 --
.../flink/spargel/java/MessagingFunction.java | 284 -------
.../apache/flink/spargel/java/OutgoingEdge.java | 64 --
.../spargel/java/VertexCentricIteration.java | 567 --------------
.../spargel/java/VertexUpdateFunction.java | 145 ----
.../examples/SpargelConnectedComponents.java | 79 --
.../spargel/java/examples/SpargelPageRank.java | 117 ---
.../SpargelPageRankCountingVertices.java | 153 ----
.../apache/flink/spargel/java/record/Edge.java | 43 --
.../spargel/java/record/MessageIterator.java | 59 --
.../spargel/java/record/MessagingFunction.java | 163 ----
.../spargel/java/record/SpargelIteration.java | 280 -------
.../java/record/VertexUpdateFunction.java | 90 ---
.../flink/spargel/java/SpargelCompilerTest.java | 183 -----
.../spargel/java/SpargelTranslationTest.java | 215 ------
.../SpargelConnectedComponentsITCase.java | 81 --
flink-addons/yarn/pom.xml | 60 --
.../apache/flink/yarn/ApplicationMaster.java | 323 --------
.../main/java/org/apache/flink/yarn/Client.java | 633 ----------------
.../main/java/org/apache/flink/yarn/Utils.java | 266 -------
.../flink/yarn/YarnTaskManagerRunner.java | 68 --
flink-dist/pom.xml | 8 +-
flink-quickstart/flink-quickstart-java/pom.xml | 19 +
.../java/org/apache/flink/quickstart/Dummy.java | 28 +
.../main/resources/META-INF/maven/archetype.xml | 8 +
.../main/resources/archetype-resources/pom.xml | 59 ++
.../archetype-resources/src/main/java/Job.java | 53 ++
.../src/main/java/WordCountJob.java | 81 ++
flink-quickstart/flink-quickstart-scala/pom.xml | 19 +
.../java/org/apache/flink/quickstart/Dummy.java | 28 +
.../META-INF/maven/archetype-metadata.xml | 26 +
.../main/resources/META-INF/maven/archetype.xml | 7 +
.../main/resources/archetype-resources/pom.xml | 140 ++++
.../src/main/scala/Job.scala | 91 +++
flink-quickstart/pom.xml | 4 +-
flink-quickstart/quickstart-SNAPSHOT.sh | 6 +-
flink-quickstart/quickstart-java/pom.xml | 19 -
.../java/org/apache/flink/quickstart/Dummy.java | 28 -
.../main/resources/META-INF/maven/archetype.xml | 8 -
.../main/resources/archetype-resources/pom.xml | 59 --
.../archetype-resources/src/main/java/Job.java | 53 --
.../src/main/java/WordCountJob.java | 81 --
flink-quickstart/quickstart-scala-SNAPSHOT.sh | 2 +-
flink-quickstart/quickstart-scala/pom.xml | 19 -
.../java/org/apache/flink/quickstart/Dummy.java | 28 -
.../META-INF/maven/archetype-metadata.xml | 26 -
.../main/resources/META-INF/maven/archetype.xml | 7 -
.../main/resources/archetype-resources/pom.xml | 140 ----
.../src/main/scala/Job.scala | 91 ---
227 files changed, 16013 insertions(+), 16013 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/docs/cluster_setup.md
----------------------------------------------------------------------
diff --git a/docs/cluster_setup.md b/docs/cluster_setup.md
index af97916..7ebbfd2 100644
--- a/docs/cluster_setup.md
+++ b/docs/cluster_setup.md
@@ -269,7 +269,7 @@ node by setting the `jobmanager.heap.mb` and `taskmanager.heap.mb` keys.
The value is given in MB. If some worker nodes have more main memory which you
want to allocate to the Flink system you can overwrite the default value
-by setting an environment variable `STRATOSPHERE_TM_HEAP` on the respective
+by setting an environment variable `FLINK_TM_HEAP` on the respective
node.
Finally you must provide a list of all nodes in your cluster which shall be used
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/docs/java_api_guide.md
----------------------------------------------------------------------
diff --git a/docs/java_api_guide.md b/docs/java_api_guide.md
index 0f45c01..51b5459 100644
--- a/docs/java_api_guide.md
+++ b/docs/java_api_guide.md
@@ -72,7 +72,7 @@ The simplest way to do this is to use the [quickstart scripts](java_api_quicksta
```bash
mvn archetype:generate /
-DarchetypeGroupId=org.apache.flink/
- -DarchetypeArtifactId=quickstart-java /
+ -DarchetypeArtifactId=flink-quickstart-java /
-DarchetypeVersion={{site.FLINK_VERSION_STABLE }}
```
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/docs/java_api_quickstart.md
----------------------------------------------------------------------
diff --git a/docs/java_api_quickstart.md b/docs/java_api_quickstart.md
index dfcef7a..d51088a 100644
--- a/docs/java_api_quickstart.md
+++ b/docs/java_api_quickstart.md
@@ -25,7 +25,7 @@ Use one of the following commands to __create a project__:
{% highlight bash %}
$ mvn archetype:generate \
-DarchetypeGroupId=org.apache.flink \
- -DarchetypeArtifactId=quickstart-java \
+ -DarchetypeArtifactId=flink-quickstart-java \
-DarchetypeVersion={{site.FLINK_VERSION_STABLE}}
{% endhighlight %}
This allows you to <strong>name your newly created project</strong>. It will interactively ask you for the groupId, artifactId, and package name.
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/docs/scala_api_quickstart.md
----------------------------------------------------------------------
diff --git a/docs/scala_api_quickstart.md b/docs/scala_api_quickstart.md
index dee9c1a..c376669 100644
--- a/docs/scala_api_quickstart.md
+++ b/docs/scala_api_quickstart.md
@@ -25,7 +25,7 @@ $ curl https://raw.githubusercontent.com/apache/incubator-flink/master/flink-qui
{% highlight bash %}
$ mvn archetype:generate \
-DarchetypeGroupId=org.apache.flink \
- -DarchetypeArtifactId=quickstart-scala \
+ -DarchetypeArtifactId=flink-quickstart-scala \
-DarchetypeVersion={{site.FLINK_VERSION_STABLE}}
{% endhighlight %}
This allows you to <strong>name your newly created project</strong>. It will interactively ask you for the groupId, artifactId, and package name.
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/avro/pom.xml
----------------------------------------------------------------------
diff --git a/flink-addons/avro/pom.xml b/flink-addons/avro/pom.xml
deleted file mode 100644
index fb77588..0000000
--- a/flink-addons/avro/pom.xml
+++ /dev/null
@@ -1,158 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License. See accompanying LICENSE file.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <artifactId>flink-addons</artifactId>
- <groupId>org.apache.flink</groupId>
- <version>0.6-incubating-SNAPSHOT</version>
- <relativePath>..</relativePath>
- </parent>
-
- <artifactId>avro</artifactId>
- <name>avro</name>
-
- <packaging>jar</packaging>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-java</artifactId>
- <version>${project.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-clients</artifactId>
- <version>${project.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.avro</groupId>
- <artifactId>avro</artifactId>
- <version>1.7.5</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.avro</groupId>
- <artifactId>avro-mapred</artifactId>
- <version>1.7.5</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-test-utils</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- </dependency>
-
- </dependencies>
-
- <build>
- <plugins>
- <!-- Exclude ExternalJar contents from regular build -->
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <configuration>
- <excludes>
- <exclude>**/src/test/java/org/apache/flink/api/avro/testjar/*.java</exclude>
- </excludes>
- </configuration>
- </plugin>
- <plugin>
- <artifactId>maven-assembly-plugin</artifactId>
- <executions>
- <execution>
- <id>create-test-dependency</id>
- <phase>process-test-classes</phase>
- <goals>
- <goal>single</goal>
- </goals>
- <configuration>
- <archive>
- <manifest>
- <mainClass>org.apache.flink.api.avro.testjar.AvroExternalJarProgram</mainClass>
- </manifest>
- </archive>
- <finalName>maven</finalName>
- <attach>false</attach>
- <descriptors>
- <descriptor>src/test/assembly/test-assembly.xml</descriptor>
- </descriptors>
- </configuration>
- </execution>
- </executions>
- </plugin>
- </plugins>
-
- <pluginManagement>
- <plugins>
- <!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.-->
- <plugin>
- <groupId>org.eclipse.m2e</groupId>
- <artifactId>lifecycle-mapping</artifactId>
- <version>1.0.0</version>
- <configuration>
- <lifecycleMappingMetadata>
- <pluginExecutions>
- <pluginExecution>
- <pluginExecutionFilter>
- <groupId>
- org.apache.maven.plugins
- </groupId>
- <artifactId>
- maven-assembly-plugin
- </artifactId>
- <versionRange>
- [2.4,)
- </versionRange>
- <goals>
- <goal>single</goal>
- </goals>
- </pluginExecutionFilter>
- <action>
- <ignore></ignore>
- </action>
- </pluginExecution>
- </pluginExecutions>
- </lifecycleMappingMetadata>
- </configuration>
- </plugin>
- </plugins>
- </pluginManagement>
- </build>
-
- <profiles>
- <profile>
- <!-- A bug with java6 is causing the javadoc generation
- to fail because the test case contains junit?
- See https://github.com/stratosphere/stratosphere/pull/405#issuecomment-32591978
- for further links -->
- <id>disable-javadocs-in-java6</id>
- <activation>
- <jdk>(,1.6]</jdk> <!-- disable javadocs for java6 or lower. -->
- </activation>
- <properties>
- <maven.javadoc.skip>true</maven.javadoc.skip>
- </properties>
- </profile>
- </profiles>
-
-</project>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/avro/src/main/java/org/apache/flink/api/avro/AvroBaseValue.java
----------------------------------------------------------------------
diff --git a/flink-addons/avro/src/main/java/org/apache/flink/api/avro/AvroBaseValue.java b/flink-addons/avro/src/main/java/org/apache/flink/api/avro/AvroBaseValue.java
deleted file mode 100644
index 8b58a98..0000000
--- a/flink-addons/avro/src/main/java/org/apache/flink/api/avro/AvroBaseValue.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.avro;
-
-import java.io.IOException;
-
-import org.apache.avro.mapred.AvroValue;
-import org.apache.avro.reflect.ReflectDatumReader;
-import org.apache.avro.reflect.ReflectDatumWriter;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.types.Key;
-import org.apache.flink.util.ReflectionUtil;
-
-
-public abstract class AvroBaseValue<T> extends AvroValue<T> implements Key<AvroBaseValue<T>> {
-
- private static final long serialVersionUID = 1L;
-
-
- public AvroBaseValue() {}
-
- public AvroBaseValue(T datum) {
- super(datum);
- }
-
-
- // --------------------------------------------------------------------------------------------
- // Serialization / Deserialization
- // --------------------------------------------------------------------------------------------
-
- private ReflectDatumWriter<T> writer;
- private ReflectDatumReader<T> reader;
-
- private DataOutputEncoder encoder;
- private DataInputDecoder decoder;
-
-
- @Override
- public void write(DataOutputView out) throws IOException {
- // the null flag
- if (datum() == null) {
- out.writeBoolean(false);
- } else {
- out.writeBoolean(true);
-
- DataOutputEncoder encoder = getEncoder();
- encoder.setOut(out);
- getWriter().write(datum(), encoder);
- }
- }
-
- @Override
- public void read(DataInputView in) throws IOException {
- // the null flag
- if (in.readBoolean()) {
-
- DataInputDecoder decoder = getDecoder();
- decoder.setIn(in);
- datum(getReader().read(datum(), decoder));
- }
- }
-
- private ReflectDatumWriter<T> getWriter() {
- if (this.writer == null) {
- @SuppressWarnings("unchecked")
- Class<T> clazz = (Class<T>) datum().getClass();
- this.writer = new ReflectDatumWriter<T>(clazz);
- }
- return this.writer;
- }
-
- private ReflectDatumReader<T> getReader() {
- if (this.reader == null) {
- Class<T> datumClass = ReflectionUtil.getTemplateType1(getClass());
- this.reader = new ReflectDatumReader<T>(datumClass);
- }
- return this.reader;
- }
-
- private DataOutputEncoder getEncoder() {
- if (this.encoder == null) {
- this.encoder = new DataOutputEncoder();
- }
- return this.encoder;
- }
-
- private DataInputDecoder getDecoder() {
- if (this.decoder == null) {
- this.decoder = new DataInputDecoder();
- }
- return this.decoder;
- }
-
- // --------------------------------------------------------------------------------------------
- // Hashing / Equality
- // --------------------------------------------------------------------------------------------
-
- @Override
- public int hashCode() {
- return datum() == null ? 0 : datum().hashCode();
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj.getClass() == this.getClass()) {
- Object otherDatum = ((AvroBaseValue<?>) obj).datum();
- Object thisDatum = datum();
-
- if (thisDatum == null) {
- return otherDatum == null;
- } else {
- return thisDatum.equals(otherDatum);
- }
- } else {
- return false;
- }
- }
-
- @Override
- public String toString() {
- return "AvroBaseValue (" + datum() + ")";
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public int compareTo(AvroBaseValue<T> o) {
- Object otherDatum = o.datum();
- Object thisDatum = datum();
-
- if (thisDatum == null) {
- return otherDatum == null ? 0 : -1;
- } else {
- return otherDatum == null ? 1: ((Comparable<Object>) thisDatum).compareTo(otherDatum);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/avro/src/main/java/org/apache/flink/api/avro/DataInputDecoder.java
----------------------------------------------------------------------
diff --git a/flink-addons/avro/src/main/java/org/apache/flink/api/avro/DataInputDecoder.java b/flink-addons/avro/src/main/java/org/apache/flink/api/avro/DataInputDecoder.java
deleted file mode 100644
index 3c9fd34..0000000
--- a/flink-addons/avro/src/main/java/org/apache/flink/api/avro/DataInputDecoder.java
+++ /dev/null
@@ -1,213 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.avro;
-
-import java.io.DataInput;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import org.apache.avro.io.Decoder;
-import org.apache.avro.util.Utf8;
-
-
-public class DataInputDecoder extends Decoder {
-
- private final Utf8 stringDecoder = new Utf8();
-
- private DataInput in;
-
- public void setIn(DataInput in) {
- this.in = in;
- }
-
- // --------------------------------------------------------------------------------------------
- // primitives
- // --------------------------------------------------------------------------------------------
-
- @Override
- public void readNull() {}
-
-
- @Override
- public boolean readBoolean() throws IOException {
- return in.readBoolean();
- }
-
- @Override
- public int readInt() throws IOException {
- return in.readInt();
- }
-
- @Override
- public long readLong() throws IOException {
- return in.readLong();
- }
-
- @Override
- public float readFloat() throws IOException {
- return in.readFloat();
- }
-
- @Override
- public double readDouble() throws IOException {
- return in.readDouble();
- }
-
- @Override
- public int readEnum() throws IOException {
- return readInt();
- }
-
- // --------------------------------------------------------------------------------------------
- // bytes
- // --------------------------------------------------------------------------------------------
-
- @Override
- public void readFixed(byte[] bytes, int start, int length) throws IOException {
- in.readFully(bytes, start, length);
- }
-
- @Override
- public ByteBuffer readBytes(ByteBuffer old) throws IOException {
- int length = readInt();
- ByteBuffer result;
- if (old != null && length <= old.capacity() && old.hasArray()) {
- result = old;
- result.clear();
- } else {
- result = ByteBuffer.allocate(length);
- }
- in.readFully(result.array(), result.arrayOffset() + result.position(), length);
- result.limit(length);
- return result;
- }
-
-
- @Override
- public void skipFixed(int length) throws IOException {
- skipBytes(length);
- }
-
- @Override
- public void skipBytes() throws IOException {
- int num = readInt();
- skipBytes(num);
- }
-
- // --------------------------------------------------------------------------------------------
- // strings
- // --------------------------------------------------------------------------------------------
-
-
- @Override
- public Utf8 readString(Utf8 old) throws IOException {
- int length = readInt();
- Utf8 result = (old != null ? old : new Utf8());
- result.setByteLength(length);
-
- if (length > 0) {
- in.readFully(result.getBytes(), 0, length);
- }
-
- return result;
- }
-
- @Override
- public String readString() throws IOException {
- return readString(stringDecoder).toString();
- }
-
- @Override
- public void skipString() throws IOException {
- int len = readInt();
- skipBytes(len);
- }
-
- // --------------------------------------------------------------------------------------------
- // collection types
- // --------------------------------------------------------------------------------------------
-
- @Override
- public long readArrayStart() throws IOException {
- return readVarLongCount(in);
- }
-
- @Override
- public long arrayNext() throws IOException {
- return readVarLongCount(in);
- }
-
- @Override
- public long skipArray() throws IOException {
- return readVarLongCount(in);
- }
-
- @Override
- public long readMapStart() throws IOException {
- return readVarLongCount(in);
- }
-
- @Override
- public long mapNext() throws IOException {
- return readVarLongCount(in);
- }
-
- @Override
- public long skipMap() throws IOException {
- return readVarLongCount(in);
- }
-
- // --------------------------------------------------------------------------------------------
- // union
- // --------------------------------------------------------------------------------------------
-
- @Override
- public int readIndex() throws IOException {
- return readInt();
- }
-
- // --------------------------------------------------------------------------------------------
- // utils
- // --------------------------------------------------------------------------------------------
-
- private void skipBytes(int num) throws IOException {
- while (num > 0) {
- num -= in.skipBytes(num);
- }
- }
-
- public static long readVarLongCount(DataInput in) throws IOException {
- long value = in.readUnsignedByte();
-
- if ((value & 0x80) == 0) {
- return value;
- }
- else {
- long curr;
- int shift = 7;
- value = value & 0x7f;
- while (((curr = in.readUnsignedByte()) & 0x80) != 0){
- value |= (curr & 0x7f) << shift;
- shift += 7;
- }
- value |= curr << shift;
- return value;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/avro/src/main/java/org/apache/flink/api/avro/DataOutputEncoder.java
----------------------------------------------------------------------
diff --git a/flink-addons/avro/src/main/java/org/apache/flink/api/avro/DataOutputEncoder.java b/flink-addons/avro/src/main/java/org/apache/flink/api/avro/DataOutputEncoder.java
deleted file mode 100644
index 5463237..0000000
--- a/flink-addons/avro/src/main/java/org/apache/flink/api/avro/DataOutputEncoder.java
+++ /dev/null
@@ -1,183 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.avro;
-
-import java.io.DataOutput;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import org.apache.avro.io.Encoder;
-import org.apache.avro.util.Utf8;
-
-
-public final class DataOutputEncoder extends Encoder implements java.io.Serializable {
-
- private static final long serialVersionUID = 1L;
-
- private DataOutput out;
-
-
- public void setOut(DataOutput out) {
- this.out = out;
- }
-
-
- @Override
- public void flush() throws IOException {}
-
- // --------------------------------------------------------------------------------------------
- // primitives
- // --------------------------------------------------------------------------------------------
-
- @Override
- public void writeNull() {}
-
-
- @Override
- public void writeBoolean(boolean b) throws IOException {
- out.writeBoolean(b);
- }
-
- @Override
- public void writeInt(int n) throws IOException {
- out.writeInt(n);
- }
-
- @Override
- public void writeLong(long n) throws IOException {
- out.writeLong(n);
- }
-
- @Override
- public void writeFloat(float f) throws IOException {
- out.writeFloat(f);
- }
-
- @Override
- public void writeDouble(double d) throws IOException {
- out.writeDouble(d);
- }
-
- @Override
- public void writeEnum(int e) throws IOException {
- out.writeInt(e);
- }
-
-
- // --------------------------------------------------------------------------------------------
- // bytes
- // --------------------------------------------------------------------------------------------
-
- @Override
- public void writeFixed(byte[] bytes, int start, int len) throws IOException {
- out.write(bytes, start, len);
- }
-
- @Override
- public void writeBytes(byte[] bytes, int start, int len) throws IOException {
- out.writeInt(len);
- if (len > 0) {
- out.write(bytes, start, len);
- }
- }
-
- @Override
- public void writeBytes(ByteBuffer bytes) throws IOException {
- int num = bytes.remaining();
- out.writeInt(num);
-
- if (num > 0) {
- writeFixed(bytes);
- }
- }
-
- // --------------------------------------------------------------------------------------------
- // strings
- // --------------------------------------------------------------------------------------------
-
- @Override
- public void writeString(String str) throws IOException {
- byte[] bytes = Utf8.getBytesFor(str);
- writeBytes(bytes, 0, bytes.length);
- }
-
- @Override
- public void writeString(Utf8 utf8) throws IOException {
- writeBytes(utf8.getBytes(), 0, utf8.getByteLength());
-
- }
-
- // --------------------------------------------------------------------------------------------
- // collection types
- // --------------------------------------------------------------------------------------------
-
- @Override
- public void writeArrayStart() {}
-
- @Override
- public void setItemCount(long itemCount) throws IOException {
- if (itemCount > 0) {
- writeVarLongCount(out, itemCount);
- }
- }
-
- @Override
- public void startItem() {}
-
- @Override
- public void writeArrayEnd() throws IOException {
- // write a single byte 0, shortcut for a var-length long of 0
- out.write(0);
- }
-
- @Override
- public void writeMapStart() {}
-
- @Override
- public void writeMapEnd() throws IOException {
- // write a single byte 0, shortcut for a var-length long of 0
- out.write(0);
- }
-
- // --------------------------------------------------------------------------------------------
- // union
- // --------------------------------------------------------------------------------------------
-
- @Override
- public void writeIndex(int unionIndex) throws IOException {
- out.writeInt(unionIndex);
- }
-
- // --------------------------------------------------------------------------------------------
- // utils
- // --------------------------------------------------------------------------------------------
-
-
- public static final void writeVarLongCount(DataOutput out, long val) throws IOException {
- if (val < 0) {
- throw new IOException("Illegal count (must be non-negative): " + val);
- }
-
- while ((val & ~0x7FL) != 0) {
- out.write(((int) val) | 0x80);
- val >>>= 7;
- }
- out.write((int) val);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/avro/src/main/java/org/apache/flink/api/avro/FSDataInputStreamWrapper.java
----------------------------------------------------------------------
diff --git a/flink-addons/avro/src/main/java/org/apache/flink/api/avro/FSDataInputStreamWrapper.java b/flink-addons/avro/src/main/java/org/apache/flink/api/avro/FSDataInputStreamWrapper.java
deleted file mode 100644
index cb4a739..0000000
--- a/flink-addons/avro/src/main/java/org/apache/flink/api/avro/FSDataInputStreamWrapper.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.avro;
-
-import java.io.Closeable;
-import java.io.IOException;
-
-import org.apache.avro.file.SeekableInput;
-import org.apache.flink.core.fs.FSDataInputStream;
-
-
-/**
- * Code copy pasted from org.apache.avro.mapred.FSInput (which is Apache licensed as well)
- *
- * The wrapper keeps track of the position in the data stream.
- */
-public class FSDataInputStreamWrapper implements Closeable, SeekableInput {
- private final FSDataInputStream stream;
- private final long len;
- private long pos;
-
- public FSDataInputStreamWrapper(FSDataInputStream stream, long len) {
- this.stream = stream;
- this.len = len;
- this.pos = 0;
- }
-
- public long length() {
- return len;
- }
-
- public int read(byte[] b, int off, int len) throws IOException {
- int read;
- read = stream.read(b, off, len);
- pos += read;
- return read;
- }
-
- public void seek(long p) throws IOException {
- stream.seek(p);
- pos = p;
- }
-
- public long tell() throws IOException {
- return pos;
- }
-
- public void close() throws IOException {
- stream.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java b/flink-addons/avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java
deleted file mode 100644
index 1031d81..0000000
--- a/flink-addons/avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.io;
-
-import java.io.IOException;
-
-import org.apache.avro.file.DataFileReader;
-import org.apache.avro.file.FileReader;
-import org.apache.avro.file.SeekableInput;
-import org.apache.avro.io.DatumReader;
-import org.apache.avro.reflect.ReflectDatumReader;
-import org.apache.avro.specific.SpecificDatumReader;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.flink.api.avro.FSDataInputStreamWrapper;
-import org.apache.flink.api.common.io.FileInputFormat;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.core.fs.FileInputSplit;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.types.TypeInformation;
-import org.apache.flink.util.InstantiationUtil;
-
-
-public class AvroInputFormat<E> extends FileInputFormat<E> implements ResultTypeQueryable<E> {
-
- private static final long serialVersionUID = 1L;
-
- private static final Log LOG = LogFactory.getLog(AvroInputFormat.class);
-
-
- private final Class<E> avroValueType;
-
- private boolean reuseAvroValue = true;
-
-
- private transient FileReader<E> dataFileReader;
-
-
- public AvroInputFormat(Path filePath, Class<E> type) {
- super(filePath);
- this.avroValueType = type;
- this.unsplittable = true;
- }
-
-
- /**
- * Sets the flag whether to reuse the Avro value instance for all records.
- * By default, the input format reuses the Avro value.
- *
- * @param reuseAvroValue True, if the input format should reuse the Avro value instance, false otherwise.
- */
- public void setReuseAvroValue(boolean reuseAvroValue) {
- this.reuseAvroValue = reuseAvroValue;
- }
-
- // --------------------------------------------------------------------------------------------
- // Typing
- // --------------------------------------------------------------------------------------------
-
- @Override
- public TypeInformation<E> getProducedType() {
- return TypeExtractor.getForClass(this.avroValueType);
- }
-
- // --------------------------------------------------------------------------------------------
- // Input Format Methods
- // --------------------------------------------------------------------------------------------
-
- @Override
- public void open(FileInputSplit split) throws IOException {
- super.open(split);
-
- DatumReader<E> datumReader;
- if (org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroValueType)) {
- datumReader = new SpecificDatumReader<E>(avroValueType);
- } else {
- datumReader = new ReflectDatumReader<E>(avroValueType);
- }
-
- LOG.info("Opening split " + split);
-
- SeekableInput in = new FSDataInputStreamWrapper(stream, (int) split.getLength());
-
- dataFileReader = DataFileReader.openReader(in, datumReader);
- dataFileReader.sync(split.getStart());
- }
-
- @Override
- public boolean reachedEnd() throws IOException {
- return !dataFileReader.hasNext();
- }
-
- @Override
- public E nextRecord(E reuseValue) throws IOException {
- if (!dataFileReader.hasNext()) {
- return null;
- }
-
- if (!reuseAvroValue) {
- reuseValue = InstantiationUtil.instantiate(avroValueType, Object.class);
- }
-
- reuseValue = dataFileReader.next(reuseValue);
- return reuseValue;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java b/flink-addons/avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java
deleted file mode 100644
index 56c8214..0000000
--- a/flink-addons/avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.io;
-
-
-import org.apache.avro.Schema;
-import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.io.DatumWriter;
-import org.apache.avro.reflect.ReflectData;
-import org.apache.avro.reflect.ReflectDatumWriter;
-import org.apache.avro.specific.SpecificDatumWriter;
-import org.apache.flink.api.common.io.FileOutputFormat;
-import org.apache.flink.core.fs.Path;
-
-import java.io.IOException;
-
-public class AvroOutputFormat<E> extends FileOutputFormat<E> {
-
- private static final long serialVersionUID = 1L;
-
- private final Class<E> avroValueType;
-
- private Schema userDefinedSchema = null;
-
- private transient DataFileWriter<E> dataFileWriter;
-
-
- public AvroOutputFormat(Path filePath, Class<E> type) {
- super(filePath);
- this.avroValueType = type;
- }
-
- public AvroOutputFormat(Class<E> type) {
- this.avroValueType = type;
- }
-
- public void setSchema(Schema schema) {
- this.userDefinedSchema = schema;
- }
-
- @Override
- public void writeRecord(E record) throws IOException {
- dataFileWriter.append(record);
- }
-
- @Override
- public void open(int taskNumber, int numTasks) throws IOException {
- super.open(taskNumber, numTasks);
- DatumWriter<E> datumWriter;
- Schema schema = null;
- if (org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroValueType)) {
- datumWriter = new SpecificDatumWriter<E>(avroValueType);
- try {
- schema = ((org.apache.avro.specific.SpecificRecordBase)avroValueType.newInstance()).getSchema();
- } catch (InstantiationException e) {
- throw new RuntimeException(e.getMessage());
- } catch (IllegalAccessException e) {
- throw new RuntimeException(e.getMessage());
- }
- } else {
- datumWriter = new ReflectDatumWriter<E>(avroValueType);
- schema = ReflectData.get().getSchema(avroValueType);
- }
- dataFileWriter = new DataFileWriter<E>(datumWriter);
- if (userDefinedSchema == null) {
- dataFileWriter.create(schema, stream);
- } else {
- dataFileWriter.create(userDefinedSchema, stream);
- }
- }
-
- @Override
- public void close() throws IOException {
- dataFileWriter.flush();
- dataFileWriter.close();
- super.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/avro/src/main/java/org/apache/flink/api/java/record/io/avro/AvroInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/avro/src/main/java/org/apache/flink/api/java/record/io/avro/AvroInputFormat.java b/flink-addons/avro/src/main/java/org/apache/flink/api/java/record/io/avro/AvroInputFormat.java
deleted file mode 100644
index ab96895..0000000
--- a/flink-addons/avro/src/main/java/org/apache/flink/api/java/record/io/avro/AvroInputFormat.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.record.io.avro;
-
-import java.io.IOException;
-
-import org.apache.avro.file.DataFileReader;
-import org.apache.avro.file.FileReader;
-import org.apache.avro.file.SeekableInput;
-import org.apache.avro.io.DatumReader;
-import org.apache.avro.reflect.ReflectDatumReader;
-import org.apache.avro.specific.SpecificDatumReader;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.flink.api.avro.AvroBaseValue;
-import org.apache.flink.api.avro.FSDataInputStreamWrapper;
-import org.apache.flink.api.java.record.io.FileInputFormat;
-import org.apache.flink.core.fs.FileInputSplit;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.InstantiationUtil;
-import org.apache.flink.util.ReflectionUtil;
-
-
-public class AvroInputFormat<E> extends FileInputFormat {
-
- private static final long serialVersionUID = 1L;
-
- private static final Log LOG = LogFactory.getLog(AvroInputFormat.class);
-
-
- private final Class<? extends AvroBaseValue<E>> avroWrapperTypeClass;
-
- private final Class<E> avroValueType;
-
-
- private transient FileReader<E> dataFileReader;
-
- private transient E reuseAvroValue;
-
- private transient AvroBaseValue<E> wrapper;
-
-
- public AvroInputFormat(Class<? extends AvroBaseValue<E>> wrapperClass) {
- this.avroWrapperTypeClass = wrapperClass;
- this.avroValueType = ReflectionUtil.getTemplateType1(wrapperClass);
- this.unsplittable = true;
- }
-
- public AvroInputFormat(Class<? extends AvroBaseValue<E>> wrapperClass, Class<E> avroType) {
- this.avroValueType = avroType;
- this.avroWrapperTypeClass = wrapperClass;
- this.unsplittable = true;
- }
-
- @Override
- public void open(FileInputSplit split) throws IOException {
- super.open(split);
-
- this.wrapper = InstantiationUtil.instantiate(avroWrapperTypeClass, AvroBaseValue.class);
-
- DatumReader<E> datumReader;
- if (org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroValueType)) {
- datumReader = new SpecificDatumReader<E>(avroValueType);
- } else {
- datumReader = new ReflectDatumReader<E>(avroValueType);
- }
-
- LOG.info("Opening split " + split);
-
- SeekableInput in = new FSDataInputStreamWrapper(stream, (int) split.getLength());
-
- dataFileReader = DataFileReader.openReader(in, datumReader);
- dataFileReader.sync(split.getStart());
-
- reuseAvroValue = null;
- }
-
- @Override
- public boolean reachedEnd() throws IOException {
- return !dataFileReader.hasNext();
- }
-
- @Override
- public Record nextRecord(Record record) throws IOException {
- if (!dataFileReader.hasNext()) {
- return null;
- }
-
- reuseAvroValue = dataFileReader.next(reuseAvroValue);
- wrapper.datum(reuseAvroValue);
- record.setField(0, wrapper);
- return record;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/avro/src/main/java/org/apache/flink/api/java/record/io/avro/AvroRecordInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/avro/src/main/java/org/apache/flink/api/java/record/io/avro/AvroRecordInputFormat.java b/flink-addons/avro/src/main/java/org/apache/flink/api/java/record/io/avro/AvroRecordInputFormat.java
deleted file mode 100644
index 1464ca9..0000000
--- a/flink-addons/avro/src/main/java/org/apache/flink/api/java/record/io/avro/AvroRecordInputFormat.java
+++ /dev/null
@@ -1,374 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.record.io.avro;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.avro.Schema;
-import org.apache.avro.Schema.Field;
-import org.apache.avro.Schema.Type;
-import org.apache.avro.file.DataFileReader;
-import org.apache.avro.file.FileReader;
-import org.apache.avro.file.SeekableInput;
-import org.apache.avro.generic.GenericDatumReader;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.io.DatumReader;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.flink.api.avro.FSDataInputStreamWrapper;
-import org.apache.flink.api.java.record.io.FileInputFormat;
-import org.apache.flink.core.fs.FileInputSplit;
-import org.apache.flink.core.fs.FileStatus;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.types.BooleanValue;
-import org.apache.flink.types.DoubleValue;
-import org.apache.flink.types.FloatValue;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.ListValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.MapValue;
-import org.apache.flink.types.NullValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
-import org.apache.flink.types.Value;
-
-/**
- * Input format to read Avro files.
- *
- * The input format currently supports only flat avro schemas. So there is no
- * support for complex types except for nullable primitve fields, e.g.
- * ["string", null] (See
- * http://avro.apache.org/docs/current/spec.html#schema_complex)
- *
- */
-public class AvroRecordInputFormat extends FileInputFormat {
- private static final long serialVersionUID = 1L;
-
- private static final Log LOG = LogFactory.getLog(AvroRecordInputFormat.class);
-
- private FileReader<GenericRecord> dataFileReader;
- private GenericRecord reuseAvroRecord = null;
-
- @Override
- public void open(FileInputSplit split) throws IOException {
- super.open(split);
- DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>();
- SeekableInput in = new FSDataInputStreamWrapper(stream, (int) split.getLength());
- LOG.info("Opening split " + split);
- dataFileReader = DataFileReader.openReader(in, datumReader);
- dataFileReader.sync(split.getStart());
- }
-
- @Override
- public boolean reachedEnd() throws IOException {
- return !dataFileReader.hasNext();
- }
-
- @Override
- public Record nextRecord(Record record) throws IOException {
- if (!dataFileReader.hasNext()) {
- return null;
- }
- if (record == null) {
- throw new IllegalArgumentException("Empty PactRecord given");
- }
- reuseAvroRecord = dataFileReader.next(reuseAvroRecord);
- final List<Field> fields = reuseAvroRecord.getSchema().getFields();
- for (Field field : fields) {
- final Value value = convertAvroToPactValue(field, reuseAvroRecord.get(field.pos()));
- record.setField(field.pos(), value);
- record.updateBinaryRepresenation();
- }
-
- return record;
- }
-
-
- @SuppressWarnings("unchecked")
- private final Value convertAvroToPactValue(final Field field, final Object avroRecord) {
- if (avroRecord == null) {
- return null;
- }
- final Type type = checkTypeConstraintsAndGetType(field.schema());
-
- // check for complex types
- // (complex type FIXED is not yet supported)
- switch (type) {
- case ARRAY:
- final Type elementType = field.schema().getElementType().getType();
- final List<?> avroList = (List<?>) avroRecord;
- return convertAvroArrayToListValue(elementType, avroList);
- case ENUM:
- final List<String> symbols = field.schema().getEnumSymbols();
- final String avroRecordString = avroRecord.toString();
- if (!symbols.contains(avroRecordString)) {
- throw new RuntimeException("The given Avro file contains field with a invalid enum symbol");
- }
- sString.setValue(avroRecordString);
- return sString;
- case MAP:
- final Type valueType = field.schema().getValueType().getType();
- final Map<CharSequence, ?> avroMap = (Map<CharSequence, ?>) avroRecord;
- return convertAvroMapToMapValue(valueType, avroMap);
-
- // primitive type
- default:
- return convertAvroPrimitiveToValue(type, avroRecord);
-
- }
- }
-
- private final ListValue<?> convertAvroArrayToListValue(Type elementType, List<?> avroList) {
- switch (elementType) {
- case STRING:
- StringListValue sl = new StringListValue();
- for (Object item : avroList) {
- sl.add(new StringValue((CharSequence) item));
- }
- return sl;
- case INT:
- IntListValue il = new IntListValue();
- for (Object item : avroList) {
- il.add(new IntValue((Integer) item));
- }
- return il;
- case BOOLEAN:
- BooleanListValue bl = new BooleanListValue();
- for (Object item : avroList) {
- bl.add(new BooleanValue((Boolean) item));
- }
- return bl;
- case DOUBLE:
- DoubleListValue dl = new DoubleListValue();
- for (Object item : avroList) {
- dl.add(new DoubleValue((Double) item));
- }
- return dl;
- case FLOAT:
- FloatListValue fl = new FloatListValue();
- for (Object item : avroList) {
- fl.add(new FloatValue((Float) item));
- }
- return fl;
- case LONG:
- LongListValue ll = new LongListValue();
- for (Object item : avroList) {
- ll.add(new LongValue((Long) item));
- }
- return ll;
- default:
- throw new RuntimeException("Elements of type " + elementType + " are not supported for Avro arrays.");
- }
- }
-
- private final MapValue<StringValue, ?> convertAvroMapToMapValue(Type mapValueType, Map<CharSequence, ?> avroMap) {
- switch (mapValueType) {
- case STRING:
- StringMapValue sm = new StringMapValue();
- for (Map.Entry<CharSequence, ?> entry : avroMap.entrySet()) {
- sm.put(new StringValue((CharSequence) entry.getKey()), new StringValue((String) entry.getValue()));
- }
- return sm;
- case INT:
- IntMapValue im = new IntMapValue();
- for (Map.Entry<CharSequence, ?> entry : avroMap.entrySet()) {
- im.put(new StringValue((CharSequence) entry.getKey()), new IntValue((Integer) entry.getValue()));
- }
- return im;
- case BOOLEAN:
- BooleanMapValue bm = new BooleanMapValue();
- for (Map.Entry<CharSequence, ?> entry : avroMap.entrySet()) {
- bm.put(new StringValue((CharSequence) entry.getKey()), new BooleanValue((Boolean) entry.getValue()));
- }
- return bm;
- case DOUBLE:
- DoubleMapValue dm = new DoubleMapValue();
- for (Map.Entry<CharSequence, ?> entry : avroMap.entrySet()) {
- dm.put(new StringValue((CharSequence) entry.getKey()), new DoubleValue((Double) entry.getValue()));
- }
- return dm;
- case FLOAT:
- FloatMapValue fm = new FloatMapValue();
- for (Map.Entry<CharSequence, ?> entry : avroMap.entrySet()) {
- fm.put(new StringValue((CharSequence) entry.getKey()), new FloatValue((Float) entry.getValue()));
- }
- return fm;
- case LONG:
- LongMapValue lm = new LongMapValue();
- for (Map.Entry<CharSequence, ?> entry : avroMap.entrySet()) {
- lm.put(new StringValue((CharSequence) entry.getKey()), new LongValue((Long) entry.getValue()));
- }
- return lm;
-
- default:
- throw new RuntimeException("Map values of type " + mapValueType + " are not supported for Avro map.");
- }
- }
-
- private StringValue sString = new StringValue();
- private IntValue sInt = new IntValue();
- private BooleanValue sBool = new BooleanValue();
- private DoubleValue sDouble = new DoubleValue();
- private FloatValue sFloat = new FloatValue();
- private LongValue sLong = new LongValue();
-
- private final Value convertAvroPrimitiveToValue(Type type, Object avroRecord) {
- switch (type) {
- case STRING:
- sString.setValue((CharSequence) avroRecord);
- return sString;
- case INT:
- sInt.setValue((Integer) avroRecord);
- return sInt;
- case BOOLEAN:
- sBool.setValue((Boolean) avroRecord);
- return sBool;
- case DOUBLE:
- sDouble.setValue((Double) avroRecord);
- return sDouble;
- case FLOAT:
- sFloat.setValue((Float) avroRecord);
- return sFloat;
- case LONG:
- sLong.setValue((Long) avroRecord);
- return sLong;
- case NULL:
- return NullValue.getInstance();
- default:
- throw new RuntimeException(
- "Type "
- + type
- + " for AvroInputFormat is not implemented. Open an issue on GitHub.");
- }
- }
-
- private final Type checkTypeConstraintsAndGetType(final Schema schema) {
- final Type type = schema.getType();
- if (type == Type.RECORD) {
- throw new RuntimeException("The given Avro file contains complex data types which are not supported right now");
- }
-
- if (type == Type.UNION) {
- List<Schema> types = schema.getTypes();
- if (types.size() > 2) {
- throw new RuntimeException("The given Avro file contains a union that has more than two elements");
- }
- if (types.size() == 1 && types.get(0).getType() != Type.UNION) {
- return types.get(0).getType();
- }
- if (types.get(0).getType() == Type.UNION || types.get(1).getType() == Type.UNION) {
- throw new RuntimeException("The given Avro file contains a nested union");
- }
- if (types.get(0).getType() == Type.NULL) {
- return types.get(1).getType();
- } else {
- if (types.get(1).getType() != Type.NULL) {
- throw new RuntimeException("The given Avro file is contains a union with two non-null types.");
- }
- return types.get(0).getType();
- }
- }
- return type;
- }
-
- /**
- * Set minNumSplits to number of files.
- */
- @Override
- public FileInputSplit[] createInputSplits(int minNumSplits) throws IOException {
- int numAvroFiles = 0;
- final Path path = this.filePath;
- // get all the files that are involved in the splits
- final FileSystem fs = path.getFileSystem();
- final FileStatus pathFile = fs.getFileStatus(path);
-
- if (!acceptFile(pathFile)) {
- throw new IOException("The given file does not pass the file-filter");
- }
- if (pathFile.isDir()) {
- // input is directory. list all contained files
- final FileStatus[] dir = fs.listStatus(path);
- for (int i = 0; i < dir.length; i++) {
- if (!dir[i].isDir() && acceptFile(dir[i])) {
- numAvroFiles++;
- }
- }
- } else {
- numAvroFiles = 1;
- }
- return super.createInputSplits(numAvroFiles);
- }
-
- // --------------------------------------------------------------------------------------------
- // Concrete subclasses of ListValue and MapValue for all possible primitive types
- // --------------------------------------------------------------------------------------------
-
- public static class StringListValue extends ListValue<StringValue> {
- private static final long serialVersionUID = 1L;
- }
-
- public static class IntListValue extends ListValue<IntValue> {
- private static final long serialVersionUID = 1L;
- }
-
- public static class BooleanListValue extends ListValue<BooleanValue> {
- private static final long serialVersionUID = 1L;
- }
-
- public static class DoubleListValue extends ListValue<DoubleValue> {
- private static final long serialVersionUID = 1L;
- }
-
- public static class FloatListValue extends ListValue<FloatValue> {
- private static final long serialVersionUID = 1L;
- }
-
- public static class LongListValue extends ListValue<LongValue> {
- private static final long serialVersionUID = 1L;
- }
-
- public static class StringMapValue extends MapValue<StringValue, StringValue> {
- private static final long serialVersionUID = 1L;
- }
-
- public static class IntMapValue extends MapValue<StringValue, IntValue> {
- private static final long serialVersionUID = 1L;
- }
-
- public static class BooleanMapValue extends MapValue<StringValue, BooleanValue> {
- private static final long serialVersionUID = 1L;
- }
-
- public static class DoubleMapValue extends MapValue<StringValue, DoubleValue> {
- private static final long serialVersionUID = 1L;
- }
-
- public static class FloatMapValue extends MapValue<StringValue, FloatValue> {
- private static final long serialVersionUID = 1L;
- }
-
- public static class LongMapValue extends MapValue<StringValue, LongValue> {
- private static final long serialVersionUID = 1L;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/avro/src/main/java/org/apache/flink/api/java/record/io/avro/example/ReflectiveAvroTypeExample.java
----------------------------------------------------------------------
diff --git a/flink-addons/avro/src/main/java/org/apache/flink/api/java/record/io/avro/example/ReflectiveAvroTypeExample.java b/flink-addons/avro/src/main/java/org/apache/flink/api/java/record/io/avro/example/ReflectiveAvroTypeExample.java
deleted file mode 100644
index 9cdaef0..0000000
--- a/flink-addons/avro/src/main/java/org/apache/flink/api/java/record/io/avro/example/ReflectiveAvroTypeExample.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.record.io.avro.example;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Iterator;
-import java.util.Random;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.io.OutputFormat;
-import org.apache.flink.api.java.record.functions.MapFunction;
-import org.apache.flink.api.java.record.functions.ReduceFunction;
-import org.apache.flink.api.java.record.io.GenericInputFormat;
-import org.apache.flink.api.java.record.operators.GenericDataSink;
-import org.apache.flink.api.java.record.operators.GenericDataSource;
-import org.apache.flink.api.java.record.operators.MapOperator;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
-import org.apache.flink.client.LocalExecutor;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
-import org.apache.flink.util.Collector;
-
-
-public class ReflectiveAvroTypeExample {
-
-
- public static void main(String[] args) throws Exception {
-
- GenericDataSource<UserGeneratingInputFormat> source = new GenericDataSource<UserGeneratingInputFormat>(UserGeneratingInputFormat.class);
-
- MapOperator mapper = MapOperator.builder(new NumberExtractingMapper())
- .input(source).name("le mapper").build();
-
- ReduceOperator reducer = ReduceOperator.builder(new ConcatenatingReducer(), IntValue.class, 1)
- .input(mapper).name("le reducer").build();
-
- GenericDataSink sink = new GenericDataSink(PrintingOutputFormat.class, reducer);
-
- Plan p = new Plan(sink);
- p.setDefaultParallelism(4);
-
- LocalExecutor.execute(p);
- }
-
-
- public static final class NumberExtractingMapper extends MapFunction implements Serializable {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void map(Record record, Collector<Record> out) throws Exception {
- User u = record.getField(0, SUser.class).datum();
- record.setField(1, new IntValue(u.getFavoriteNumber()));
- out.collect(record);
- }
- }
-
-
- public static final class ConcatenatingReducer extends ReduceFunction implements Serializable {
- private static final long serialVersionUID = 1L;
-
- private final Record result = new Record(2);
-
- @Override
- public void reduce(Iterator<Record> records, Collector<Record> out) throws Exception {
- Record r = records.next();
-
- int num = r.getField(1, IntValue.class).getValue();
- String names = r.getField(0, SUser.class).datum().getFavoriteColor().toString();
-
- while (records.hasNext()) {
- r = records.next();
- names += " - " + r.getField(0, SUser.class).datum().getFavoriteColor().toString();
- }
-
- result.setField(0, new IntValue(num));
- result.setField(1, new StringValue(names));
- out.collect(result);
- }
-
- }
-
-
- public static final class UserGeneratingInputFormat extends GenericInputFormat {
-
- private static final long serialVersionUID = 1L;
-
- private static final int NUM = 100;
-
- private final Random rnd = new Random(32498562304986L);
-
- private static final String[] NAMES = { "Peter", "Bob", "Liddy", "Alexander", "Stan" };
-
- private static final String[] COLORS = { "mauve", "crimson", "copper", "sky", "grass" };
-
- private int count;
-
-
- @Override
- public boolean reachedEnd() throws IOException {
- return count >= NUM;
- }
-
- @Override
- public Record nextRecord(Record record) throws IOException {
- count++;
-
- User u = new User();
- u.setName(NAMES[rnd.nextInt(NAMES.length)]);
- u.setFavoriteColor(COLORS[rnd.nextInt(COLORS.length)]);
- u.setFavoriteNumber(rnd.nextInt(87));
-
- SUser su = new SUser();
- su.datum(u);
-
- record.setField(0, su);
- return record;
- }
- }
-
- public static final class PrintingOutputFormat implements OutputFormat<Record> {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public void configure(Configuration parameters) {}
-
- @Override
- public void open(int taskNumber, int numTasks) throws IOException {}
-
- @Override
- public void writeRecord(Record record) throws IOException {
- int color = record.getField(0, IntValue.class).getValue();
- String names = record.getField(1, StringValue.class).getValue();
-
- System.out.println(color + ": " + names);
- }
-
- @Override
- public void close() throws IOException {}
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/avro/src/main/java/org/apache/flink/api/java/record/io/avro/example/SUser.java
----------------------------------------------------------------------
diff --git a/flink-addons/avro/src/main/java/org/apache/flink/api/java/record/io/avro/example/SUser.java b/flink-addons/avro/src/main/java/org/apache/flink/api/java/record/io/avro/example/SUser.java
deleted file mode 100644
index 2fdfc05..0000000
--- a/flink-addons/avro/src/main/java/org/apache/flink/api/java/record/io/avro/example/SUser.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.record.io.avro.example;
-
-import org.apache.flink.api.avro.AvroBaseValue;
-
-public class SUser extends AvroBaseValue<User> {
- private static final long serialVersionUID = 1L;
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/avro/src/main/java/org/apache/flink/api/java/record/io/avro/example/User.java
----------------------------------------------------------------------
diff --git a/flink-addons/avro/src/main/java/org/apache/flink/api/java/record/io/avro/example/User.java b/flink-addons/avro/src/main/java/org/apache/flink/api/java/record/io/avro/example/User.java
deleted file mode 100644
index 7f48775..0000000
--- a/flink-addons/avro/src/main/java/org/apache/flink/api/java/record/io/avro/example/User.java
+++ /dev/null
@@ -1,269 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-/**
- * Autogenerated by Avro
- *
- * DO NOT EDIT DIRECTLY
- */
-package org.apache.flink.api.java.record.io.avro.example;
-@SuppressWarnings("all")
-@org.apache.avro.specific.AvroGenerated
-public class User extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
- public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"org.apache.flink.api.avro.example\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"favorite_number\",\"type\":[\"int\",\"null\"]},{\"name\":\"favorite_color\",\"type\":[\"string\",\"null\"]}]}");
- public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
- @Deprecated public java.lang.CharSequence name;
- @Deprecated public java.lang.Integer favorite_number;
- @Deprecated public java.lang.CharSequence favorite_color;
-
- /**
- * Default constructor. Note that this does not initialize fields
- * to their default values from the schema. If that is desired then
- * one should use {@link \#newBuilder()}.
- */
- public User() {}
-
- /**
- * All-args constructor.
- */
- public User(java.lang.CharSequence name, java.lang.Integer favorite_number, java.lang.CharSequence favorite_color) {
- this.name = name;
- this.favorite_number = favorite_number;
- this.favorite_color = favorite_color;
- }
-
- public org.apache.avro.Schema getSchema() { return SCHEMA$; }
- // Used by DatumWriter. Applications should not call.
- public java.lang.Object get(int field$) {
- switch (field$) {
- case 0: return name;
- case 1: return favorite_number;
- case 2: return favorite_color;
- default: throw new org.apache.avro.AvroRuntimeException("Bad index");
- }
- }
- // Used by DatumReader. Applications should not call.
- @SuppressWarnings(value="unchecked")
- public void put(int field$, java.lang.Object value$) {
- switch (field$) {
- case 0: name = (java.lang.CharSequence)value$; break;
- case 1: favorite_number = (java.lang.Integer)value$; break;
- case 2: favorite_color = (java.lang.CharSequence)value$; break;
- default: throw new org.apache.avro.AvroRuntimeException("Bad index");
- }
- }
-
- /**
- * Gets the value of the 'name' field.
- */
- public java.lang.CharSequence getName() {
- return name;
- }
-
- /**
- * Sets the value of the 'name' field.
- * @param value the value to set.
- */
- public void setName(java.lang.CharSequence value) {
- this.name = value;
- }
-
- /**
- * Gets the value of the 'favorite_number' field.
- */
- public java.lang.Integer getFavoriteNumber() {
- return favorite_number;
- }
-
- /**
- * Sets the value of the 'favorite_number' field.
- * @param value the value to set.
- */
- public void setFavoriteNumber(java.lang.Integer value) {
- this.favorite_number = value;
- }
-
- /**
- * Gets the value of the 'favorite_color' field.
- */
- public java.lang.CharSequence getFavoriteColor() {
- return favorite_color;
- }
-
- /**
- * Sets the value of the 'favorite_color' field.
- * @param value the value to set.
- */
- public void setFavoriteColor(java.lang.CharSequence value) {
- this.favorite_color = value;
- }
-
- /** Creates a new User RecordBuilder */
- public static org.apache.flink.api.java.record.io.avro.example.User.Builder newBuilder() {
- return new org.apache.flink.api.java.record.io.avro.example.User.Builder();
- }
-
- /** Creates a new User RecordBuilder by copying an existing Builder */
- public static org.apache.flink.api.java.record.io.avro.example.User.Builder newBuilder(org.apache.flink.api.java.record.io.avro.example.User.Builder other) {
- return new org.apache.flink.api.java.record.io.avro.example.User.Builder(other);
- }
-
- /** Creates a new User RecordBuilder by copying an existing User instance */
- public static org.apache.flink.api.java.record.io.avro.example.User.Builder newBuilder(org.apache.flink.api.java.record.io.avro.example.User other) {
- return new org.apache.flink.api.java.record.io.avro.example.User.Builder(other);
- }
-
- /**
- * RecordBuilder for User instances.
- */
- public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase<User>
- implements org.apache.avro.data.RecordBuilder<User> {
-
- private java.lang.CharSequence name;
- private java.lang.Integer favorite_number;
- private java.lang.CharSequence favorite_color;
-
- /** Creates a new Builder */
- private Builder() {
- super(org.apache.flink.api.java.record.io.avro.example.User.SCHEMA$);
- }
-
- /** Creates a Builder by copying an existing Builder */
- private Builder(org.apache.flink.api.java.record.io.avro.example.User.Builder other) {
- super(other);
- if (isValidValue(fields()[0], other.name)) {
- this.name = data().deepCopy(fields()[0].schema(), other.name);
- fieldSetFlags()[0] = true;
- }
- if (isValidValue(fields()[1], other.favorite_number)) {
- this.favorite_number = data().deepCopy(fields()[1].schema(), other.favorite_number);
- fieldSetFlags()[1] = true;
- }
- if (isValidValue(fields()[2], other.favorite_color)) {
- this.favorite_color = data().deepCopy(fields()[2].schema(), other.favorite_color);
- fieldSetFlags()[2] = true;
- }
- }
-
- /** Creates a Builder by copying an existing User instance */
- private Builder(org.apache.flink.api.java.record.io.avro.example.User other) {
- super(org.apache.flink.api.java.record.io.avro.example.User.SCHEMA$);
- if (isValidValue(fields()[0], other.name)) {
- this.name = data().deepCopy(fields()[0].schema(), other.name);
- fieldSetFlags()[0] = true;
- }
- if (isValidValue(fields()[1], other.favorite_number)) {
- this.favorite_number = data().deepCopy(fields()[1].schema(), other.favorite_number);
- fieldSetFlags()[1] = true;
- }
- if (isValidValue(fields()[2], other.favorite_color)) {
- this.favorite_color = data().deepCopy(fields()[2].schema(), other.favorite_color);
- fieldSetFlags()[2] = true;
- }
- }
-
- /** Gets the value of the 'name' field */
- public java.lang.CharSequence getName() {
- return name;
- }
-
- /** Sets the value of the 'name' field */
- public org.apache.flink.api.java.record.io.avro.example.User.Builder setName(java.lang.CharSequence value) {
- validate(fields()[0], value);
- this.name = value;
- fieldSetFlags()[0] = true;
- return this;
- }
-
- /** Checks whether the 'name' field has been set */
- public boolean hasName() {
- return fieldSetFlags()[0];
- }
-
- /** Clears the value of the 'name' field */
- public org.apache.flink.api.java.record.io.avro.example.User.Builder clearName() {
- name = null;
- fieldSetFlags()[0] = false;
- return this;
- }
-
- /** Gets the value of the 'favorite_number' field */
- public java.lang.Integer getFavoriteNumber() {
- return favorite_number;
- }
-
- /** Sets the value of the 'favorite_number' field */
- public org.apache.flink.api.java.record.io.avro.example.User.Builder setFavoriteNumber(java.lang.Integer value) {
- validate(fields()[1], value);
- this.favorite_number = value;
- fieldSetFlags()[1] = true;
- return this;
- }
-
- /** Checks whether the 'favorite_number' field has been set */
- public boolean hasFavoriteNumber() {
- return fieldSetFlags()[1];
- }
-
- /** Clears the value of the 'favorite_number' field */
- public org.apache.flink.api.java.record.io.avro.example.User.Builder clearFavoriteNumber() {
- favorite_number = null;
- fieldSetFlags()[1] = false;
- return this;
- }
-
- /** Gets the value of the 'favorite_color' field */
- public java.lang.CharSequence getFavoriteColor() {
- return favorite_color;
- }
-
- /** Sets the value of the 'favorite_color' field */
- public org.apache.flink.api.java.record.io.avro.example.User.Builder setFavoriteColor(java.lang.CharSequence value) {
- validate(fields()[2], value);
- this.favorite_color = value;
- fieldSetFlags()[2] = true;
- return this;
- }
-
- /** Checks whether the 'favorite_color' field has been set */
- public boolean hasFavoriteColor() {
- return fieldSetFlags()[2];
- }
-
- /** Clears the value of the 'favorite_color' field */
- public org.apache.flink.api.java.record.io.avro.example.User.Builder clearFavoriteColor() {
- favorite_color = null;
- fieldSetFlags()[2] = false;
- return this;
- }
-
- @Override
- public User build() {
- try {
- User record = new User();
- record.name = fieldSetFlags()[0] ? this.name : (java.lang.CharSequence) defaultValue(fields()[0]);
- record.favorite_number = fieldSetFlags()[1] ? this.favorite_number : (java.lang.Integer) defaultValue(fields()[1]);
- record.favorite_color = fieldSetFlags()[2] ? this.favorite_color : (java.lang.CharSequence) defaultValue(fields()[2]);
- return record;
- } catch (Exception e) {
- throw new org.apache.avro.AvroRuntimeException(e);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/avro/src/test/assembly/test-assembly.xml
----------------------------------------------------------------------
diff --git a/flink-addons/avro/src/test/assembly/test-assembly.xml b/flink-addons/avro/src/test/assembly/test-assembly.xml
deleted file mode 100644
index 8316581..0000000
--- a/flink-addons/avro/src/test/assembly/test-assembly.xml
+++ /dev/null
@@ -1,31 +0,0 @@
-<!--
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License. See accompanying LICENSE file.
--->
-
-<assembly>
- <id>test-jar</id>
- <formats>
- <format>jar</format>
- </formats>
- <includeBaseDirectory>false</includeBaseDirectory>
- <fileSets>
- <fileSet>
- <directory>${project.build.testOutputDirectory}</directory>
- <outputDirectory>/</outputDirectory>
- <!--modify/add include to match your package(s) -->
- <includes>
- <include>org/apache/flink/api/avro/testjar/**</include>
- </includes>
- </fileSet>
- </fileSets>
-</assembly>
\ No newline at end of file