You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2014/07/22 12:41:13 UTC

[73/92] [abbrv] git commit: prefix all projects in addons and quickstarts with flink-

prefix all projects in addons and quickstarts with flink-


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/4771efc2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/4771efc2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/4771efc2

Branch: refs/heads/travis_test
Commit: 4771efc2d2c0c9f210d343cd71c228401fe096e6
Parents: fbc9338
Author: Robert Metzger <rm...@apache.org>
Authored: Fri Jul 11 19:28:16 2014 +0200
Committer: Robert Metzger <rm...@apache.org>
Committed: Fri Jul 11 20:02:50 2014 +0200

----------------------------------------------------------------------
 docs/cluster_setup.md                           |   2 +-
 docs/java_api_guide.md                          |   2 +-
 docs/java_api_quickstart.md                     |   2 +-
 docs/scala_api_quickstart.md                    |   2 +-
 flink-addons/avro/pom.xml                       | 158 ----
 .../apache/flink/api/avro/AvroBaseValue.java    | 153 ----
 .../apache/flink/api/avro/DataInputDecoder.java | 213 ------
 .../flink/api/avro/DataOutputEncoder.java       | 183 -----
 .../api/avro/FSDataInputStreamWrapper.java      |  68 --
 .../flink/api/java/io/AvroInputFormat.java      | 124 ---
 .../flink/api/java/io/AvroOutputFormat.java     |  95 ---
 .../java/record/io/avro/AvroInputFormat.java    | 111 ---
 .../record/io/avro/AvroRecordInputFormat.java   | 374 ---------
 .../avro/example/ReflectiveAvroTypeExample.java | 161 ----
 .../api/java/record/io/avro/example/SUser.java  |  25 -
 .../api/java/record/io/avro/example/User.java   | 269 -------
 .../avro/src/test/assembly/test-assembly.xml    |  31 -
 .../api/avro/AvroExternalJarProgramITCase.java  |  77 --
 .../flink/api/avro/AvroOutputFormatTest.java    | 168 -----
 .../api/avro/AvroWithEmptyArrayITCase.java      | 218 ------
 .../flink/api/avro/EncoderDecoderTest.java      | 523 -------------
 .../avro/testjar/AvroExternalJarProgram.java    | 232 ------
 .../io/AvroInputFormatTypeExtractionTest.java   |  81 --
 .../io/avro/AvroRecordInputFormatTest.java      | 169 -----
 .../java/record/io/avro/generated/Colors.java   |  32 -
 .../api/java/record/io/avro/generated/User.java | 755 -------------------
 .../avro/src/test/resources/avro/user.avsc      |  19 -
 .../avro/src/test/resources/testdata.avro       | Bin 4572 -> 0 bytes
 flink-addons/flink-avro/pom.xml                 | 158 ++++
 .../apache/flink/api/avro/AvroBaseValue.java    | 153 ++++
 .../apache/flink/api/avro/DataInputDecoder.java | 213 ++++++
 .../flink/api/avro/DataOutputEncoder.java       | 183 +++++
 .../api/avro/FSDataInputStreamWrapper.java      |  68 ++
 .../flink/api/java/io/AvroInputFormat.java      | 124 +++
 .../flink/api/java/io/AvroOutputFormat.java     |  95 +++
 .../java/record/io/avro/AvroInputFormat.java    | 111 +++
 .../record/io/avro/AvroRecordInputFormat.java   | 374 +++++++++
 .../avro/example/ReflectiveAvroTypeExample.java | 161 ++++
 .../api/java/record/io/avro/example/SUser.java  |  25 +
 .../api/java/record/io/avro/example/User.java   | 269 +++++++
 .../src/test/assembly/test-assembly.xml         |  31 +
 .../api/avro/AvroExternalJarProgramITCase.java  |  77 ++
 .../flink/api/avro/AvroOutputFormatTest.java    | 168 +++++
 .../api/avro/AvroWithEmptyArrayITCase.java      | 218 ++++++
 .../flink/api/avro/EncoderDecoderTest.java      | 523 +++++++++++++
 .../avro/testjar/AvroExternalJarProgram.java    | 232 ++++++
 .../io/AvroInputFormatTypeExtractionTest.java   |  81 ++
 .../io/avro/AvroRecordInputFormatTest.java      | 169 +++++
 .../java/record/io/avro/generated/Colors.java   |  32 +
 .../api/java/record/io/avro/generated/User.java | 755 +++++++++++++++++++
 .../src/test/resources/avro/user.avsc           |  19 +
 .../flink-avro/src/test/resources/testdata.avro | Bin 0 -> 4572 bytes
 flink-addons/flink-hadoop-compatibility/pom.xml |  77 ++
 .../mapred/HadoopInputFormat.java               | 291 +++++++
 .../mapred/HadoopOutputFormat.java              | 168 +++++
 .../mapred/example/WordCount.java               | 120 +++
 .../mapred/record/HadoopDataSink.java           | 107 +++
 .../mapred/record/HadoopDataSource.java         |  86 +++
 .../mapred/record/HadoopRecordInputFormat.java  | 172 +++++
 .../mapred/record/HadoopRecordOutputFormat.java | 156 ++++
 .../datatypes/DefaultFlinkTypeConverter.java    |  95 +++
 .../datatypes/DefaultHadoopTypeConverter.java   |  83 ++
 .../record/datatypes/FlinkTypeConverter.java    |  43 ++
 .../datatypes/HadoopFileOutputCommitter.java    | 196 +++++
 .../record/datatypes/HadoopTypeConverter.java   |  42 ++
 .../datatypes/WritableComparableWrapper.java    |  40 +
 .../record/datatypes/WritableWrapper.java       |  71 ++
 .../datatypes/WritableWrapperConverter.java     |  45 ++
 .../mapred/record/example/WordCount.java        | 184 +++++
 .../example/WordCountWithOutputFormat.java      | 173 +++++
 .../mapred/utils/HadoopUtils.java               |  87 +++
 .../mapred/wrapper/HadoopDummyProgressable.java |  33 +
 .../mapred/wrapper/HadoopDummyReporter.java     |  71 ++
 .../mapred/wrapper/HadoopInputSplit.java        |  92 +++
 .../mapreduce/HadoopInputFormat.java            | 337 +++++++++
 .../mapreduce/HadoopOutputFormat.java           | 207 +++++
 .../mapreduce/example/WordCount.java            | 121 +++
 .../mapreduce/utils/HadoopUtils.java            |  83 ++
 .../mapreduce/wrapper/HadoopInputSplit.java     |  90 +++
 .../mapred/HadoopInputOutputITCase.java         |  46 ++
 .../record/HadoopRecordInputOutputITCase.java   |  54 ++
 .../mapreduce/HadoopInputOutputITCase.java      |  46 ++
 flink-addons/flink-hbase/pom.xml                | 111 +++
 .../addons/hbase/GenericTableOutputFormat.java  | 116 +++
 .../flink/addons/hbase/HBaseDataSink.java       |  47 ++
 .../flink/addons/hbase/TableInputFormat.java    | 407 ++++++++++
 .../flink/addons/hbase/TableInputSplit.java     | 168 +++++
 .../flink/addons/hbase/common/HBaseKey.java     |  87 +++
 .../flink/addons/hbase/common/HBaseResult.java  |  69 ++
 .../flink/addons/hbase/common/HBaseUtil.java    |  68 ++
 .../addons/hbase/example/HBaseReadExample.java  | 129 ++++
 flink-addons/flink-jdbc/pom.xml                 |  59 ++
 .../flink/api/java/io/jdbc/JDBCInputFormat.java | 356 +++++++++
 .../api/java/io/jdbc/JDBCOutputFormat.java      | 274 +++++++
 .../api/java/io/jdbc/example/JDBCExample.java   | 101 +++
 .../java/record/io/jdbc/JDBCInputFormat.java    | 389 ++++++++++
 .../java/record/io/jdbc/JDBCOutputFormat.java   | 353 +++++++++
 .../record/io/jdbc/example/JDBCExample.java     | 136 ++++
 .../api/java/io/jdbc/JDBCInputFormatTest.java   | 196 +++++
 .../api/java/io/jdbc/JDBCOutputFormatTest.java  | 260 +++++++
 .../java/record/io/jdbc/DevNullLogStream.java   |  30 +
 .../record/io/jdbc/JDBCInputFormatTest.java     | 210 ++++++
 .../record/io/jdbc/JDBCOutputFormatTest.java    | 227 ++++++
 flink-addons/flink-spargel/pom.xml              |  55 ++
 .../flink/spargel/java/MessageIterator.java     |  58 ++
 .../flink/spargel/java/MessagingFunction.java   | 284 +++++++
 .../apache/flink/spargel/java/OutgoingEdge.java |  64 ++
 .../spargel/java/VertexCentricIteration.java    | 567 ++++++++++++++
 .../spargel/java/VertexUpdateFunction.java      | 145 ++++
 .../examples/SpargelConnectedComponents.java    |  79 ++
 .../spargel/java/examples/SpargelPageRank.java  | 117 +++
 .../SpargelPageRankCountingVertices.java        | 153 ++++
 .../apache/flink/spargel/java/record/Edge.java  |  43 ++
 .../spargel/java/record/MessageIterator.java    |  59 ++
 .../spargel/java/record/MessagingFunction.java  | 163 ++++
 .../spargel/java/record/SpargelIteration.java   | 280 +++++++
 .../java/record/VertexUpdateFunction.java       |  90 +++
 .../flink/spargel/java/SpargelCompilerTest.java | 183 +++++
 .../spargel/java/SpargelTranslationTest.java    | 215 ++++++
 .../SpargelConnectedComponentsITCase.java       |  81 ++
 flink-addons/flink-yarn/pom.xml                 |  60 ++
 .../apache/flink/yarn/ApplicationMaster.java    | 323 ++++++++
 .../main/java/org/apache/flink/yarn/Client.java | 633 ++++++++++++++++
 .../main/java/org/apache/flink/yarn/Utils.java  | 266 +++++++
 .../flink/yarn/YarnTaskManagerRunner.java       |  68 ++
 flink-addons/hadoop-compatibility/pom.xml       |  77 --
 .../mapred/HadoopInputFormat.java               | 291 -------
 .../mapred/HadoopOutputFormat.java              | 168 -----
 .../mapred/example/WordCount.java               | 120 ---
 .../mapred/record/HadoopDataSink.java           | 107 ---
 .../mapred/record/HadoopDataSource.java         |  86 ---
 .../mapred/record/HadoopRecordInputFormat.java  | 172 -----
 .../mapred/record/HadoopRecordOutputFormat.java | 156 ----
 .../datatypes/DefaultFlinkTypeConverter.java    |  95 ---
 .../datatypes/DefaultHadoopTypeConverter.java   |  83 --
 .../record/datatypes/FlinkTypeConverter.java    |  43 --
 .../datatypes/HadoopFileOutputCommitter.java    | 196 -----
 .../record/datatypes/HadoopTypeConverter.java   |  42 --
 .../datatypes/WritableComparableWrapper.java    |  40 -
 .../record/datatypes/WritableWrapper.java       |  71 --
 .../datatypes/WritableWrapperConverter.java     |  45 --
 .../mapred/record/example/WordCount.java        | 184 -----
 .../example/WordCountWithOutputFormat.java      | 173 -----
 .../mapred/utils/HadoopUtils.java               |  87 ---
 .../mapred/wrapper/HadoopDummyProgressable.java |  33 -
 .../mapred/wrapper/HadoopDummyReporter.java     |  71 --
 .../mapred/wrapper/HadoopInputSplit.java        |  92 ---
 .../mapreduce/HadoopInputFormat.java            | 337 ---------
 .../mapreduce/HadoopOutputFormat.java           | 207 -----
 .../mapreduce/example/WordCount.java            | 121 ---
 .../mapreduce/utils/HadoopUtils.java            |  83 --
 .../mapreduce/wrapper/HadoopInputSplit.java     |  90 ---
 .../mapred/HadoopInputOutputITCase.java         |  46 --
 .../record/HadoopRecordInputOutputITCase.java   |  54 --
 .../mapreduce/HadoopInputOutputITCase.java      |  46 --
 flink-addons/hbase/pom.xml                      | 111 ---
 .../addons/hbase/GenericTableOutputFormat.java  | 116 ---
 .../flink/addons/hbase/HBaseDataSink.java       |  47 --
 .../flink/addons/hbase/TableInputFormat.java    | 407 ----------
 .../flink/addons/hbase/TableInputSplit.java     | 168 -----
 .../flink/addons/hbase/common/HBaseKey.java     |  87 ---
 .../flink/addons/hbase/common/HBaseResult.java  |  69 --
 .../flink/addons/hbase/common/HBaseUtil.java    |  68 --
 .../addons/hbase/example/HBaseReadExample.java  | 129 ----
 flink-addons/jdbc/pom.xml                       |  59 --
 .../flink/api/java/io/jdbc/JDBCInputFormat.java | 356 ---------
 .../api/java/io/jdbc/JDBCOutputFormat.java      | 274 -------
 .../api/java/io/jdbc/example/JDBCExample.java   | 101 ---
 .../java/record/io/jdbc/JDBCInputFormat.java    | 389 ----------
 .../java/record/io/jdbc/JDBCOutputFormat.java   | 353 ---------
 .../record/io/jdbc/example/JDBCExample.java     | 136 ----
 .../api/java/io/jdbc/JDBCInputFormatTest.java   | 196 -----
 .../api/java/io/jdbc/JDBCOutputFormatTest.java  | 260 -------
 .../java/record/io/jdbc/DevNullLogStream.java   |  30 -
 .../record/io/jdbc/JDBCInputFormatTest.java     | 210 ------
 .../record/io/jdbc/JDBCOutputFormatTest.java    | 227 ------
 flink-addons/pom.xml                            |  12 +-
 flink-addons/spargel/pom.xml                    |  55 --
 .../flink/spargel/java/MessageIterator.java     |  58 --
 .../flink/spargel/java/MessagingFunction.java   | 284 -------
 .../apache/flink/spargel/java/OutgoingEdge.java |  64 --
 .../spargel/java/VertexCentricIteration.java    | 567 --------------
 .../spargel/java/VertexUpdateFunction.java      | 145 ----
 .../examples/SpargelConnectedComponents.java    |  79 --
 .../spargel/java/examples/SpargelPageRank.java  | 117 ---
 .../SpargelPageRankCountingVertices.java        | 153 ----
 .../apache/flink/spargel/java/record/Edge.java  |  43 --
 .../spargel/java/record/MessageIterator.java    |  59 --
 .../spargel/java/record/MessagingFunction.java  | 163 ----
 .../spargel/java/record/SpargelIteration.java   | 280 -------
 .../java/record/VertexUpdateFunction.java       |  90 ---
 .../flink/spargel/java/SpargelCompilerTest.java | 183 -----
 .../spargel/java/SpargelTranslationTest.java    | 215 ------
 .../SpargelConnectedComponentsITCase.java       |  81 --
 flink-addons/yarn/pom.xml                       |  60 --
 .../apache/flink/yarn/ApplicationMaster.java    | 323 --------
 .../main/java/org/apache/flink/yarn/Client.java | 633 ----------------
 .../main/java/org/apache/flink/yarn/Utils.java  | 266 -------
 .../flink/yarn/YarnTaskManagerRunner.java       |  68 --
 flink-dist/pom.xml                              |   8 +-
 flink-quickstart/flink-quickstart-java/pom.xml  |  19 +
 .../java/org/apache/flink/quickstart/Dummy.java |  28 +
 .../main/resources/META-INF/maven/archetype.xml |   8 +
 .../main/resources/archetype-resources/pom.xml  |  59 ++
 .../archetype-resources/src/main/java/Job.java  |  53 ++
 .../src/main/java/WordCountJob.java             |  81 ++
 flink-quickstart/flink-quickstart-scala/pom.xml |  19 +
 .../java/org/apache/flink/quickstart/Dummy.java |  28 +
 .../META-INF/maven/archetype-metadata.xml       |  26 +
 .../main/resources/META-INF/maven/archetype.xml |   7 +
 .../main/resources/archetype-resources/pom.xml  | 140 ++++
 .../src/main/scala/Job.scala                    |  91 +++
 flink-quickstart/pom.xml                        |   4 +-
 flink-quickstart/quickstart-SNAPSHOT.sh         |   6 +-
 flink-quickstart/quickstart-java/pom.xml        |  19 -
 .../java/org/apache/flink/quickstart/Dummy.java |  28 -
 .../main/resources/META-INF/maven/archetype.xml |   8 -
 .../main/resources/archetype-resources/pom.xml  |  59 --
 .../archetype-resources/src/main/java/Job.java  |  53 --
 .../src/main/java/WordCountJob.java             |  81 --
 flink-quickstart/quickstart-scala-SNAPSHOT.sh   |   2 +-
 flink-quickstart/quickstart-scala/pom.xml       |  19 -
 .../java/org/apache/flink/quickstart/Dummy.java |  28 -
 .../META-INF/maven/archetype-metadata.xml       |  26 -
 .../main/resources/META-INF/maven/archetype.xml |   7 -
 .../main/resources/archetype-resources/pom.xml  | 140 ----
 .../src/main/scala/Job.scala                    |  91 ---
 227 files changed, 16013 insertions(+), 16013 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/docs/cluster_setup.md
----------------------------------------------------------------------
diff --git a/docs/cluster_setup.md b/docs/cluster_setup.md
index af97916..7ebbfd2 100644
--- a/docs/cluster_setup.md
+++ b/docs/cluster_setup.md
@@ -269,7 +269,7 @@ node by setting the `jobmanager.heap.mb` and `taskmanager.heap.mb` keys.
 
 The value is given in MB. If some worker nodes have more main memory which you
 want to allocate to the Flink system you can overwrite the default value
-by setting an environment variable `STRATOSPHERE_TM_HEAP` on the respective
+by setting an environment variable `FLINK_TM_HEAP` on the respective
 node.
 
 Finally you must provide a list of all nodes in your cluster which shall be used

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/docs/java_api_guide.md
----------------------------------------------------------------------
diff --git a/docs/java_api_guide.md b/docs/java_api_guide.md
index 0f45c01..51b5459 100644
--- a/docs/java_api_guide.md
+++ b/docs/java_api_guide.md
@@ -72,7 +72,7 @@ The simplest way to do this is to use the [quickstart scripts](java_api_quicksta
 ```bash
 mvn archetype:generate /
     -DarchetypeGroupId=org.apache.flink/
-    -DarchetypeArtifactId=quickstart-java /
+    -DarchetypeArtifactId=flink-quickstart-java /
     -DarchetypeVersion={{site.FLINK_VERSION_STABLE }}
 ```
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/docs/java_api_quickstart.md
----------------------------------------------------------------------
diff --git a/docs/java_api_quickstart.md b/docs/java_api_quickstart.md
index dfcef7a..d51088a 100644
--- a/docs/java_api_quickstart.md
+++ b/docs/java_api_quickstart.md
@@ -25,7 +25,7 @@ Use one of the following commands to __create a project__:
     {% highlight bash %}
     $ mvn archetype:generate                             \
       -DarchetypeGroupId=org.apache.flink              \
-      -DarchetypeArtifactId=quickstart-java            \
+      -DarchetypeArtifactId=flink-quickstart-java            \
       -DarchetypeVersion={{site.FLINK_VERSION_STABLE}}
     {% endhighlight %}
         This allows you to <strong>name your newly created project</strong>. It will interactively ask you for the groupId, artifactId, and package name.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/docs/scala_api_quickstart.md
----------------------------------------------------------------------
diff --git a/docs/scala_api_quickstart.md b/docs/scala_api_quickstart.md
index dee9c1a..c376669 100644
--- a/docs/scala_api_quickstart.md
+++ b/docs/scala_api_quickstart.md
@@ -25,7 +25,7 @@ $ curl https://raw.githubusercontent.com/apache/incubator-flink/master/flink-qui
 {% highlight bash %}
 $ mvn archetype:generate                             \
   -DarchetypeGroupId=org.apache.flink              \
-  -DarchetypeArtifactId=quickstart-scala           \
+  -DarchetypeArtifactId=flink-quickstart-scala           \
   -DarchetypeVersion={{site.FLINK_VERSION_STABLE}}                  
 {% endhighlight %}
     This allows you to <strong>name your newly created project</strong>. It will interactively ask you for the groupId, artifactId, and package name.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/avro/pom.xml
----------------------------------------------------------------------
diff --git a/flink-addons/avro/pom.xml b/flink-addons/avro/pom.xml
deleted file mode 100644
index fb77588..0000000
--- a/flink-addons/avro/pom.xml
+++ /dev/null
@@ -1,158 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  Licensed under the Apache License, Version 2.0 (the "License");
-  you may not use this file except in compliance with the License.
-  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License. See accompanying LICENSE file.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
-	
-	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-	
-	<modelVersion>4.0.0</modelVersion>
-	
-	<parent>
-		<artifactId>flink-addons</artifactId>
-		<groupId>org.apache.flink</groupId>
-		<version>0.6-incubating-SNAPSHOT</version>
-		<relativePath>..</relativePath>
-	</parent>
-
-	<artifactId>avro</artifactId>
-	<name>avro</name>
-
-	<packaging>jar</packaging>
-
-	<dependencies>
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-java</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-		
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-clients</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-		
-		<dependency>
-			<groupId>org.apache.avro</groupId>
-			<artifactId>avro</artifactId>
-			<version>1.7.5</version>
-		</dependency>
-		
-		<dependency>
-			<groupId>org.apache.avro</groupId>
-			<artifactId>avro-mapred</artifactId>
-			<version>1.7.5</version>
-		</dependency>
-		
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-test-utils</artifactId>
-			<version>${project.version}</version>
-			<scope>test</scope>
-		</dependency>
-		
-	</dependencies>
-
-	<build>
-		<plugins>
-		<!-- Exclude ExternalJar contents from regular build -->
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-compiler-plugin</artifactId>
-				<configuration>
-					<excludes>
-						<exclude>**/src/test/java/org/apache/flink/api/avro/testjar/*.java</exclude>
-					</excludes>
-				</configuration>
-			</plugin>
-			<plugin>
-				<artifactId>maven-assembly-plugin</artifactId>
-				<executions>
-					<execution>
-						<id>create-test-dependency</id>
-						<phase>process-test-classes</phase>
-						<goals>
-							<goal>single</goal>
-						</goals>
-						<configuration>
-							<archive>
-								<manifest>
-									<mainClass>org.apache.flink.api.avro.testjar.AvroExternalJarProgram</mainClass>
-								</manifest>
-							</archive>
-							<finalName>maven</finalName>
-							<attach>false</attach>
-							<descriptors>
-								<descriptor>src/test/assembly/test-assembly.xml</descriptor>
-							</descriptors>
-						</configuration>
-					</execution>
-				</executions>
-			</plugin>
-		</plugins>
-		
-		<pluginManagement>
-			<plugins>
-				<!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.-->
-				<plugin>
-					<groupId>org.eclipse.m2e</groupId>
-					<artifactId>lifecycle-mapping</artifactId>
-					<version>1.0.0</version>
-					<configuration>
-						<lifecycleMappingMetadata>
-							<pluginExecutions>
-								<pluginExecution>
-									<pluginExecutionFilter>
-										<groupId>
-											org.apache.maven.plugins
-										</groupId>
-										<artifactId>
-											maven-assembly-plugin
-										</artifactId>
-										<versionRange>
-											[2.4,)
-										</versionRange>
-										<goals>
-											<goal>single</goal>
-										</goals>
-									</pluginExecutionFilter>
-									<action>
-										<ignore></ignore>
-									</action>
-								</pluginExecution>
-							</pluginExecutions>
-						</lifecycleMappingMetadata>
-					</configuration>
-				</plugin>
-			</plugins>
-		</pluginManagement>
-	</build>
-
-	<profiles>
-		<profile>
-			<!-- A bug with java6 is causing the javadoc generation
-			to fail because the test case contains junit?
-			See https://github.com/stratosphere/stratosphere/pull/405#issuecomment-32591978
-			for further links -->
-			<id>disable-javadocs-in-java6</id>
-			<activation>
-				 <jdk>(,1.6]</jdk> <!-- disable javadocs for java6 or lower. -->
-			</activation>
-			<properties>
-				<maven.javadoc.skip>true</maven.javadoc.skip>
-			</properties>
-		</profile>
-	</profiles>
-
-</project>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/avro/src/main/java/org/apache/flink/api/avro/AvroBaseValue.java
----------------------------------------------------------------------
diff --git a/flink-addons/avro/src/main/java/org/apache/flink/api/avro/AvroBaseValue.java b/flink-addons/avro/src/main/java/org/apache/flink/api/avro/AvroBaseValue.java
deleted file mode 100644
index 8b58a98..0000000
--- a/flink-addons/avro/src/main/java/org/apache/flink/api/avro/AvroBaseValue.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.avro;
-
-import java.io.IOException;
-
-import org.apache.avro.mapred.AvroValue;
-import org.apache.avro.reflect.ReflectDatumReader;
-import org.apache.avro.reflect.ReflectDatumWriter;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.types.Key;
-import org.apache.flink.util.ReflectionUtil;
-
-
-public abstract class AvroBaseValue<T> extends AvroValue<T> implements Key<AvroBaseValue<T>> {
-	
-	private static final long serialVersionUID = 1L;
-
-
-	public AvroBaseValue() {}
-	
-	public AvroBaseValue(T datum) {
-		super(datum);
-	}
-
-	
-	// --------------------------------------------------------------------------------------------
-	//  Serialization / Deserialization
-	// --------------------------------------------------------------------------------------------
-	
-	private ReflectDatumWriter<T> writer;
-	private ReflectDatumReader<T> reader;
-	
-	private DataOutputEncoder encoder;
-	private DataInputDecoder decoder;
-	
-	
-	@Override
-	public void write(DataOutputView out) throws IOException {
-		// the null flag
-		if (datum() == null) {
-			out.writeBoolean(false);
-		} else {
-			out.writeBoolean(true);
-			
-			DataOutputEncoder encoder = getEncoder();
-			encoder.setOut(out);
-			getWriter().write(datum(), encoder);
-		}
-	}
-
-	@Override
-	public void read(DataInputView in) throws IOException {
-		// the null flag
-		if (in.readBoolean()) {
-			
-			DataInputDecoder decoder = getDecoder();
-			decoder.setIn(in);
-			datum(getReader().read(datum(), decoder));
-		}
-	}
-	
-	private ReflectDatumWriter<T> getWriter() {
-		if (this.writer == null) {
-			@SuppressWarnings("unchecked")
-			Class<T> clazz = (Class<T>) datum().getClass();
-			this.writer = new ReflectDatumWriter<T>(clazz);
-		}
-		return this.writer;
-	}
-	
-	private ReflectDatumReader<T> getReader() {
-		if (this.reader == null) {
-			Class<T> datumClass = ReflectionUtil.getTemplateType1(getClass());
-			this.reader = new ReflectDatumReader<T>(datumClass);
-		}
-		return this.reader;
-	}
-	
-	private DataOutputEncoder getEncoder() {
-		if (this.encoder == null) {
-			this.encoder = new DataOutputEncoder();
-		}
-		return this.encoder;
-	}
-	
-	private DataInputDecoder getDecoder() {
-		if (this.decoder == null) {
-			this.decoder = new DataInputDecoder();
-		}
-		return this.decoder;
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	//  Hashing / Equality
-	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	public int hashCode() {
-		return datum() == null ? 0 : datum().hashCode();
-	}
-	
-	@Override
-	public boolean equals(Object obj) {
-		if (obj.getClass() == this.getClass()) {
-			Object otherDatum = ((AvroBaseValue<?>) obj).datum();
-			Object thisDatum = datum();
-			
-			if (thisDatum == null) {
-				return otherDatum == null;
-			} else {
-				return thisDatum.equals(otherDatum);
-			}
-		} else {
-			return false;
-		}
-	}
-	
-	@Override
-	public String toString() {
-		return "AvroBaseValue (" + datum() + ")";
-	}
-	
-	@SuppressWarnings("unchecked")
-	@Override
-	public int compareTo(AvroBaseValue<T> o) {
-		Object otherDatum = o.datum();
-		Object thisDatum = datum();
-		
-		if (thisDatum == null) {
-			return otherDatum == null ? 0 : -1;
-		} else {
-			return otherDatum == null ? 1: ((Comparable<Object>) thisDatum).compareTo(otherDatum);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/avro/src/main/java/org/apache/flink/api/avro/DataInputDecoder.java
----------------------------------------------------------------------
diff --git a/flink-addons/avro/src/main/java/org/apache/flink/api/avro/DataInputDecoder.java b/flink-addons/avro/src/main/java/org/apache/flink/api/avro/DataInputDecoder.java
deleted file mode 100644
index 3c9fd34..0000000
--- a/flink-addons/avro/src/main/java/org/apache/flink/api/avro/DataInputDecoder.java
+++ /dev/null
@@ -1,213 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.avro;
-
-import java.io.DataInput;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import org.apache.avro.io.Decoder;
-import org.apache.avro.util.Utf8;
-
-
-public class DataInputDecoder extends Decoder {
-	
-	private final Utf8 stringDecoder = new Utf8();
-	
-	private DataInput in;
-	
-	public void setIn(DataInput in) {
-		this.in = in;
-	}
-
-	// --------------------------------------------------------------------------------------------
-	// primitives
-	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	public void readNull() {}
-	
-
-	@Override
-	public boolean readBoolean() throws IOException {
-		return in.readBoolean();
-	}
-
-	@Override
-	public int readInt() throws IOException {
-		return in.readInt();
-	}
-
-	@Override
-	public long readLong() throws IOException {
-		return in.readLong();
-	}
-
-	@Override
-	public float readFloat() throws IOException {
-		return in.readFloat();
-	}
-
-	@Override
-	public double readDouble() throws IOException {
-		return in.readDouble();
-	}
-	
-	@Override
-	public int readEnum() throws IOException {
-		return readInt();
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	// bytes
-	// --------------------------------------------------------------------------------------------
-
-	@Override
-	public void readFixed(byte[] bytes, int start, int length) throws IOException {
-		in.readFully(bytes, start, length);
-	}
-	
-	@Override
-	public ByteBuffer readBytes(ByteBuffer old) throws IOException {
-		int length = readInt();
-		ByteBuffer result;
-		if (old != null && length <= old.capacity() && old.hasArray()) {
-			result = old;
-			result.clear();
-		} else {
-			result = ByteBuffer.allocate(length);
-		}
-		in.readFully(result.array(), result.arrayOffset() + result.position(), length);
-		result.limit(length);
-		return result;
-	}
-	
-	
-	@Override
-	public void skipFixed(int length) throws IOException {
-		skipBytes(length);
-	}
-	
-	@Override
-	public void skipBytes() throws IOException {
-		int num = readInt();
-		skipBytes(num);
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	// strings
-	// --------------------------------------------------------------------------------------------
-	
-	
-	@Override
-	public Utf8 readString(Utf8 old) throws IOException {
-		int length = readInt();
-		Utf8 result = (old != null ? old : new Utf8());
-		result.setByteLength(length);
-		
-		if (length > 0) {
-			in.readFully(result.getBytes(), 0, length);
-		}
-		
-		return result;
-	}
-
-	@Override
-	public String readString() throws IOException {
-		return readString(stringDecoder).toString();
-	}
-
-	@Override
-	public void skipString() throws IOException {
-		int len = readInt();
-		skipBytes(len);
-	}
-
-	// --------------------------------------------------------------------------------------------
-	// collection types
-	// --------------------------------------------------------------------------------------------
-
-	@Override
-	public long readArrayStart() throws IOException {
-		return readVarLongCount(in);
-	}
-
-	@Override
-	public long arrayNext() throws IOException {
-		return readVarLongCount(in);
-	}
-
-	@Override
-	public long skipArray() throws IOException {
-		return readVarLongCount(in);
-	}
-
-	@Override
-	public long readMapStart() throws IOException {
-		return readVarLongCount(in);
-	}
-
-	@Override
-	public long mapNext() throws IOException {
-		return readVarLongCount(in);
-	}
-
-	@Override
-	public long skipMap() throws IOException {
-		return readVarLongCount(in);
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	// union
-	// --------------------------------------------------------------------------------------------
-
-	@Override
-	public int readIndex() throws IOException {
-		return readInt();
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	// utils
-	// --------------------------------------------------------------------------------------------
-	
-	private void skipBytes(int num) throws IOException {
-		while (num > 0) {
-			num -= in.skipBytes(num);
-		}
-	}
-	
-	public static long readVarLongCount(DataInput in) throws IOException {
-		long value = in.readUnsignedByte();
-
-		if ((value & 0x80) == 0) {
-			return value;
-		}
-		else {
-			long curr;
-			int shift = 7;
-			value = value & 0x7f;
-			while (((curr = in.readUnsignedByte()) & 0x80) != 0){
-				value |= (curr & 0x7f) << shift;
-				shift += 7;
-			}
-			value |= curr << shift;
-			return value;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/avro/src/main/java/org/apache/flink/api/avro/DataOutputEncoder.java
----------------------------------------------------------------------
diff --git a/flink-addons/avro/src/main/java/org/apache/flink/api/avro/DataOutputEncoder.java b/flink-addons/avro/src/main/java/org/apache/flink/api/avro/DataOutputEncoder.java
deleted file mode 100644
index 5463237..0000000
--- a/flink-addons/avro/src/main/java/org/apache/flink/api/avro/DataOutputEncoder.java
+++ /dev/null
@@ -1,183 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.avro;
-
-import java.io.DataOutput;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import org.apache.avro.io.Encoder;
-import org.apache.avro.util.Utf8;
-
-
-public final class DataOutputEncoder extends Encoder implements java.io.Serializable {
-	
-	private static final long serialVersionUID = 1L;
-	
-	private DataOutput out;
-	
-	
-	public void setOut(DataOutput out) {
-		this.out = out;
-	}
-
-
-	@Override
-	public void flush() throws IOException {}
-
-	// --------------------------------------------------------------------------------------------
-	// primitives
-	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	public void writeNull() {}
-	
-
-	@Override
-	public void writeBoolean(boolean b) throws IOException {
-		out.writeBoolean(b);
-	}
-
-	@Override
-	public void writeInt(int n) throws IOException {
-		out.writeInt(n);
-	}
-
-	@Override
-	public void writeLong(long n) throws IOException {
-		out.writeLong(n);
-	}
-
-	@Override
-	public void writeFloat(float f) throws IOException {
-		out.writeFloat(f);
-	}
-
-	@Override
-	public void writeDouble(double d) throws IOException {
-		out.writeDouble(d);
-	}
-	
-	@Override
-	public void writeEnum(int e) throws IOException {
-		out.writeInt(e);
-	}
-	
-	
-	// --------------------------------------------------------------------------------------------
-	// bytes
-	// --------------------------------------------------------------------------------------------
-
-	@Override
-	public void writeFixed(byte[] bytes, int start, int len) throws IOException {
-		out.write(bytes, start, len);
-	}
-	
-	@Override
-	public void writeBytes(byte[] bytes, int start, int len) throws IOException {
-		out.writeInt(len);
-		if (len > 0) {
-			out.write(bytes, start, len);
-		}
-	}
-	
-	@Override
-	public void writeBytes(ByteBuffer bytes) throws IOException {
-		int num = bytes.remaining();
-		out.writeInt(num);
-		
-		if (num > 0) {
-			writeFixed(bytes);
-		}
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	// strings
-	// --------------------------------------------------------------------------------------------
-
-	@Override
-	public void writeString(String str) throws IOException {
-		byte[] bytes = Utf8.getBytesFor(str);
-		writeBytes(bytes, 0, bytes.length);
-	}
-	
-	@Override
-	public void writeString(Utf8 utf8) throws IOException {
-		writeBytes(utf8.getBytes(), 0, utf8.getByteLength());
-		
-	}
-
-	// --------------------------------------------------------------------------------------------
-	// collection types
-	// --------------------------------------------------------------------------------------------
-
-	@Override
-	public void writeArrayStart() {}
-
-	@Override
-	public void setItemCount(long itemCount) throws IOException {
-		if (itemCount > 0) {
-			writeVarLongCount(out, itemCount);
-		}
-	}
-
-	@Override
-	public void startItem() {}
-
-	@Override
-	public void writeArrayEnd() throws IOException {
-		// write a single byte 0, shortcut for a var-length long of 0
-		out.write(0);
-	}
-
-	@Override
-	public void writeMapStart() {}
-
-	@Override
-	public void writeMapEnd() throws IOException {
-		// write a single byte 0, shortcut for a var-length long of 0
-		out.write(0);
-	}
-
-	// --------------------------------------------------------------------------------------------
-	// union
-	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	public void writeIndex(int unionIndex) throws IOException {
-		out.writeInt(unionIndex);
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	// utils
-	// --------------------------------------------------------------------------------------------
-		
-	
-	public static final void writeVarLongCount(DataOutput out, long val) throws IOException {
-		if (val < 0) {
-			throw new IOException("Illegal count (must be non-negative): " + val);
-		}
-		
-		while ((val & ~0x7FL) != 0) {
-			out.write(((int) val) | 0x80);
-			val >>>= 7;
-		}
-		out.write((int) val);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/avro/src/main/java/org/apache/flink/api/avro/FSDataInputStreamWrapper.java
----------------------------------------------------------------------
diff --git a/flink-addons/avro/src/main/java/org/apache/flink/api/avro/FSDataInputStreamWrapper.java b/flink-addons/avro/src/main/java/org/apache/flink/api/avro/FSDataInputStreamWrapper.java
deleted file mode 100644
index cb4a739..0000000
--- a/flink-addons/avro/src/main/java/org/apache/flink/api/avro/FSDataInputStreamWrapper.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.avro;
-
-import java.io.Closeable;
-import java.io.IOException;
-
-import org.apache.avro.file.SeekableInput;
-import org.apache.flink.core.fs.FSDataInputStream;
-
-
-/**
- * Code copy pasted from org.apache.avro.mapred.FSInput (which is Apache licensed as well)
- * 
- * The wrapper keeps track of the position in the data stream.
- */
-public class FSDataInputStreamWrapper implements Closeable, SeekableInput {
-	private final FSDataInputStream stream;
-	private final long len;
-	private long pos;
-
-	public FSDataInputStreamWrapper(FSDataInputStream stream, long len) {
-		this.stream = stream;
-		this.len = len;
-		this.pos = 0;
-	}
-
-	public long length() {
-		return len;
-	}
-
-	public int read(byte[] b, int off, int len) throws IOException {
-		int read;
-		read = stream.read(b, off, len);
-		pos += read;
-		return read;
-	}
-
-	public void seek(long p) throws IOException {
-		stream.seek(p);
-		pos = p;
-	}
-
-	public long tell() throws IOException {
-		return pos;
-	}
-
-	public void close() throws IOException {
-		stream.close();
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java b/flink-addons/avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java
deleted file mode 100644
index 1031d81..0000000
--- a/flink-addons/avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.io;
-
-import java.io.IOException;
-
-import org.apache.avro.file.DataFileReader;
-import org.apache.avro.file.FileReader;
-import org.apache.avro.file.SeekableInput;
-import org.apache.avro.io.DatumReader;
-import org.apache.avro.reflect.ReflectDatumReader;
-import org.apache.avro.specific.SpecificDatumReader;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.flink.api.avro.FSDataInputStreamWrapper;
-import org.apache.flink.api.common.io.FileInputFormat;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.core.fs.FileInputSplit;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.types.TypeInformation;
-import org.apache.flink.util.InstantiationUtil;
-
-
-public class AvroInputFormat<E> extends FileInputFormat<E> implements ResultTypeQueryable<E> {
-	
-	private static final long serialVersionUID = 1L;
-
-	private static final Log LOG = LogFactory.getLog(AvroInputFormat.class);
-	
-	
-	private final Class<E> avroValueType;
-	
-	private boolean reuseAvroValue = true;
-	
-
-	private transient FileReader<E> dataFileReader;
-
-	
-	public AvroInputFormat(Path filePath, Class<E> type) {
-		super(filePath);
-		this.avroValueType = type;
-		this.unsplittable = true;
-	}
-	
-	
-	/**
-	 * Sets the flag whether to reuse the Avro value instance for all records.
-	 * By default, the input format reuses the Avro value.
-	 *
-	 * @param reuseAvroValue True, if the input format should reuse the Avro value instance, false otherwise.
-	 */
-	public void setReuseAvroValue(boolean reuseAvroValue) {
-		this.reuseAvroValue = reuseAvroValue;
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	// Typing
-	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	public TypeInformation<E> getProducedType() {
-		return TypeExtractor.getForClass(this.avroValueType);
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	// Input Format Methods
-	// --------------------------------------------------------------------------------------------
-
-	@Override
-	public void open(FileInputSplit split) throws IOException {
-		super.open(split);
-
-		DatumReader<E> datumReader;
-		if (org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroValueType)) {
-			datumReader = new SpecificDatumReader<E>(avroValueType);
-		} else {
-			datumReader = new ReflectDatumReader<E>(avroValueType);
-		}
-		
-		LOG.info("Opening split " + split);
-		
-		SeekableInput in = new FSDataInputStreamWrapper(stream, (int) split.getLength());
-		
-		dataFileReader = DataFileReader.openReader(in, datumReader);
-		dataFileReader.sync(split.getStart());
-	}
-
-	@Override
-	public boolean reachedEnd() throws IOException {
-		return !dataFileReader.hasNext();
-	}
-
-	@Override
-	public E nextRecord(E reuseValue) throws IOException {
-		if (!dataFileReader.hasNext()) {
-			return null;
-		}
-		
-		if (!reuseAvroValue) {
-			reuseValue = InstantiationUtil.instantiate(avroValueType, Object.class);
-		}
-		
-		reuseValue = dataFileReader.next(reuseValue);
-		return reuseValue;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java b/flink-addons/avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java
deleted file mode 100644
index 56c8214..0000000
--- a/flink-addons/avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.io;
-
-
-import org.apache.avro.Schema;
-import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.io.DatumWriter;
-import org.apache.avro.reflect.ReflectData;
-import org.apache.avro.reflect.ReflectDatumWriter;
-import org.apache.avro.specific.SpecificDatumWriter;
-import org.apache.flink.api.common.io.FileOutputFormat;
-import org.apache.flink.core.fs.Path;
-
-import java.io.IOException;
-
-public class AvroOutputFormat<E> extends FileOutputFormat<E> {
-
-	private static final long serialVersionUID = 1L;
-
-	private final Class<E> avroValueType;
-
-	private Schema userDefinedSchema = null;
-
-	private transient DataFileWriter<E> dataFileWriter;
-
-
-	public AvroOutputFormat(Path filePath, Class<E> type) {
-		super(filePath);
-		this.avroValueType = type;
-	}
-
-	public AvroOutputFormat(Class<E> type) {
-		this.avroValueType = type;
-	}
-
-	public void setSchema(Schema schema) {
-		this.userDefinedSchema = schema;
-	}
-
-	@Override
-	public void writeRecord(E record) throws IOException {
-		dataFileWriter.append(record);
-	}
-
-	@Override
-	public void open(int taskNumber, int numTasks) throws IOException {
-		super.open(taskNumber, numTasks);
-		DatumWriter<E> datumWriter;
-		Schema schema = null;
-		if (org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroValueType)) {
-			datumWriter = new SpecificDatumWriter<E>(avroValueType);
-			try {
-				schema = ((org.apache.avro.specific.SpecificRecordBase)avroValueType.newInstance()).getSchema();
-			} catch (InstantiationException e) {
-				throw new RuntimeException(e.getMessage());
-			} catch (IllegalAccessException e) {
-				throw new RuntimeException(e.getMessage());
-			}
-		} else {
-			datumWriter = new ReflectDatumWriter<E>(avroValueType);
-			schema = ReflectData.get().getSchema(avroValueType);
-		}
-		dataFileWriter = new DataFileWriter<E>(datumWriter);
-		if (userDefinedSchema == null) {
-			dataFileWriter.create(schema, stream);
-		} else {
-			dataFileWriter.create(userDefinedSchema, stream);
-		}
-	}
-
-	@Override
-	public void close() throws IOException {
-		dataFileWriter.flush();
-		dataFileWriter.close();
-		super.close();
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/avro/src/main/java/org/apache/flink/api/java/record/io/avro/AvroInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/avro/src/main/java/org/apache/flink/api/java/record/io/avro/AvroInputFormat.java b/flink-addons/avro/src/main/java/org/apache/flink/api/java/record/io/avro/AvroInputFormat.java
deleted file mode 100644
index ab96895..0000000
--- a/flink-addons/avro/src/main/java/org/apache/flink/api/java/record/io/avro/AvroInputFormat.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.record.io.avro;
-
-import java.io.IOException;
-
-import org.apache.avro.file.DataFileReader;
-import org.apache.avro.file.FileReader;
-import org.apache.avro.file.SeekableInput;
-import org.apache.avro.io.DatumReader;
-import org.apache.avro.reflect.ReflectDatumReader;
-import org.apache.avro.specific.SpecificDatumReader;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.flink.api.avro.AvroBaseValue;
-import org.apache.flink.api.avro.FSDataInputStreamWrapper;
-import org.apache.flink.api.java.record.io.FileInputFormat;
-import org.apache.flink.core.fs.FileInputSplit;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.InstantiationUtil;
-import org.apache.flink.util.ReflectionUtil;
-
-
-public class AvroInputFormat<E> extends FileInputFormat {
-	
-	private static final long serialVersionUID = 1L;
-
-	private static final Log LOG = LogFactory.getLog(AvroInputFormat.class);
-	
-	
-	private final Class<? extends AvroBaseValue<E>> avroWrapperTypeClass;
-	
-	private final Class<E> avroValueType;
-	
-
-	private transient FileReader<E> dataFileReader;
-	
-	private transient E reuseAvroValue;
-	
-	private transient AvroBaseValue<E> wrapper;
-	
-	
-	public AvroInputFormat(Class<? extends AvroBaseValue<E>> wrapperClass) {
-		this.avroWrapperTypeClass = wrapperClass;
-		this.avroValueType = ReflectionUtil.getTemplateType1(wrapperClass);
-		this.unsplittable = true;
-	}
-	
-	public AvroInputFormat(Class<? extends AvroBaseValue<E>> wrapperClass, Class<E> avroType) {
-		this.avroValueType = avroType;
-		this.avroWrapperTypeClass = wrapperClass;
-		this.unsplittable = true;
-	}
-
-	@Override
-	public void open(FileInputSplit split) throws IOException {
-		super.open(split);
-		
-		this.wrapper = InstantiationUtil.instantiate(avroWrapperTypeClass, AvroBaseValue.class);
-		
-		DatumReader<E> datumReader;
-		if (org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroValueType)) {
-			datumReader = new SpecificDatumReader<E>(avroValueType);
-		} else {
-			datumReader = new ReflectDatumReader<E>(avroValueType);
-		}
-		
-		LOG.info("Opening split " + split);
-		
-		SeekableInput in = new FSDataInputStreamWrapper(stream, (int) split.getLength());
-		
-		dataFileReader = DataFileReader.openReader(in, datumReader);
-		dataFileReader.sync(split.getStart());
-		
-		reuseAvroValue = null;
-	}
-
-	@Override
-	public boolean reachedEnd() throws IOException {
-		return !dataFileReader.hasNext();
-	}
-
-	@Override
-	public Record nextRecord(Record record) throws IOException {
-		if (!dataFileReader.hasNext()) {
-			return null;
-		}
-		
-		reuseAvroValue = dataFileReader.next(reuseAvroValue);
-		wrapper.datum(reuseAvroValue);
-		record.setField(0, wrapper);
-		return record;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/avro/src/main/java/org/apache/flink/api/java/record/io/avro/AvroRecordInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/avro/src/main/java/org/apache/flink/api/java/record/io/avro/AvroRecordInputFormat.java b/flink-addons/avro/src/main/java/org/apache/flink/api/java/record/io/avro/AvroRecordInputFormat.java
deleted file mode 100644
index 1464ca9..0000000
--- a/flink-addons/avro/src/main/java/org/apache/flink/api/java/record/io/avro/AvroRecordInputFormat.java
+++ /dev/null
@@ -1,374 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.record.io.avro;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.avro.Schema;
-import org.apache.avro.Schema.Field;
-import org.apache.avro.Schema.Type;
-import org.apache.avro.file.DataFileReader;
-import org.apache.avro.file.FileReader;
-import org.apache.avro.file.SeekableInput;
-import org.apache.avro.generic.GenericDatumReader;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.io.DatumReader;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.flink.api.avro.FSDataInputStreamWrapper;
-import org.apache.flink.api.java.record.io.FileInputFormat;
-import org.apache.flink.core.fs.FileInputSplit;
-import org.apache.flink.core.fs.FileStatus;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.types.BooleanValue;
-import org.apache.flink.types.DoubleValue;
-import org.apache.flink.types.FloatValue;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.ListValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.MapValue;
-import org.apache.flink.types.NullValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
-import org.apache.flink.types.Value;
-
-/**
- * Input format to read Avro files.
- * 
- * The input format currently supports only flat avro schemas. So there is no
- * support for complex types except for nullable primitve fields, e.g.
- * ["string", null] (See
- * http://avro.apache.org/docs/current/spec.html#schema_complex)
- * 
- */
-public class AvroRecordInputFormat extends FileInputFormat {
-	private static final long serialVersionUID = 1L;
-
-	private static final Log LOG = LogFactory.getLog(AvroRecordInputFormat.class);
-
-	private FileReader<GenericRecord> dataFileReader;
-	private GenericRecord reuseAvroRecord = null;
-
-	@Override
-	public void open(FileInputSplit split) throws IOException {
-		super.open(split);
-		DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>();
-		SeekableInput in = new FSDataInputStreamWrapper(stream, (int) split.getLength());
-		LOG.info("Opening split " + split);
-		dataFileReader = DataFileReader.openReader(in, datumReader);
-		dataFileReader.sync(split.getStart());
-	}
-
-	@Override
-	public boolean reachedEnd() throws IOException {
-		return !dataFileReader.hasNext();
-	}
-
-	@Override
-	public Record nextRecord(Record record) throws IOException {
-		if (!dataFileReader.hasNext()) {
-			return null;
-		}
-		if (record == null) {
-			throw new IllegalArgumentException("Empty PactRecord given");
-		}
-		reuseAvroRecord = dataFileReader.next(reuseAvroRecord);
-		final List<Field> fields = reuseAvroRecord.getSchema().getFields();
-		for (Field field : fields) {
-			final Value value = convertAvroToPactValue(field, reuseAvroRecord.get(field.pos()));
-			record.setField(field.pos(), value);
-			record.updateBinaryRepresenation();
-		}
-
-		return record;
-	}
-
-
-	@SuppressWarnings("unchecked")
-	private final Value convertAvroToPactValue(final Field field, final Object avroRecord) {
-		if (avroRecord == null) {
-			return null;
-		}
-		final Type type = checkTypeConstraintsAndGetType(field.schema());
-
-		// check for complex types
-		// (complex type FIXED is not yet supported)
-		switch (type) {
-			case ARRAY:
-				final Type elementType = field.schema().getElementType().getType();
-				final List<?> avroList = (List<?>) avroRecord;
-				return convertAvroArrayToListValue(elementType, avroList);
-			case ENUM:
-				final List<String> symbols = field.schema().getEnumSymbols();
-				final String avroRecordString = avroRecord.toString();
-				if (!symbols.contains(avroRecordString)) {
-					throw new RuntimeException("The given Avro file contains field with a invalid enum symbol");
-				}
-				sString.setValue(avroRecordString);
-				return sString;
-			case MAP:
-				final Type valueType = field.schema().getValueType().getType();
-				final Map<CharSequence, ?> avroMap = (Map<CharSequence, ?>) avroRecord;
-				return convertAvroMapToMapValue(valueType, avroMap);
-	
-			// primitive type
-			default:
-				return convertAvroPrimitiveToValue(type, avroRecord);
-
-		}
-	}
-
-	private final ListValue<?> convertAvroArrayToListValue(Type elementType, List<?> avroList) {
-		switch (elementType) {
-		case STRING:
-			StringListValue sl = new StringListValue();
-			for (Object item : avroList) {
-				sl.add(new StringValue((CharSequence) item));
-			}
-			return sl;
-		case INT:
-			IntListValue il = new IntListValue();
-			for (Object item : avroList) {
-				il.add(new IntValue((Integer) item));
-			}
-			return il;
-		case BOOLEAN:
-			BooleanListValue bl = new BooleanListValue();
-			for (Object item : avroList) {
-				bl.add(new BooleanValue((Boolean) item));
-			}
-			return bl;
-		case DOUBLE:
-			DoubleListValue dl = new DoubleListValue();
-			for (Object item : avroList) {
-				dl.add(new DoubleValue((Double) item));
-			}
-			return dl;
-		case FLOAT:
-			FloatListValue fl = new FloatListValue();
-			for (Object item : avroList) {
-				fl.add(new FloatValue((Float) item));
-			}
-			return fl;
-		case LONG:
-			LongListValue ll = new LongListValue();
-			for (Object item : avroList) {
-				ll.add(new LongValue((Long) item));
-			}
-			return ll;
-		default:
-			throw new RuntimeException("Elements of type " + elementType + " are not supported for Avro arrays.");
-		}
-	}
-
-	private final MapValue<StringValue, ?> convertAvroMapToMapValue(Type mapValueType, Map<CharSequence, ?> avroMap) {
-		switch (mapValueType) {
-		case STRING:
-			StringMapValue sm = new StringMapValue();
-			for (Map.Entry<CharSequence, ?> entry : avroMap.entrySet()) {
-				sm.put(new StringValue((CharSequence) entry.getKey()), new StringValue((String) entry.getValue()));
-			}
-			return sm;
-		case INT:
-			IntMapValue im = new IntMapValue();
-			for (Map.Entry<CharSequence, ?> entry : avroMap.entrySet()) {
-				im.put(new StringValue((CharSequence) entry.getKey()), new IntValue((Integer) entry.getValue()));
-			}
-			return im;
-		case BOOLEAN:
-			BooleanMapValue bm = new BooleanMapValue();
-			for (Map.Entry<CharSequence, ?> entry : avroMap.entrySet()) {
-				bm.put(new StringValue((CharSequence) entry.getKey()), new BooleanValue((Boolean) entry.getValue()));
-			}
-			return bm;
-		case DOUBLE:
-			DoubleMapValue dm = new DoubleMapValue();
-			for (Map.Entry<CharSequence, ?> entry : avroMap.entrySet()) {
-				dm.put(new StringValue((CharSequence) entry.getKey()), new DoubleValue((Double) entry.getValue()));
-			}
-			return dm;
-		case FLOAT:
-			FloatMapValue fm = new FloatMapValue();
-			for (Map.Entry<CharSequence, ?> entry : avroMap.entrySet()) {
-				fm.put(new StringValue((CharSequence) entry.getKey()), new FloatValue((Float) entry.getValue()));
-			}
-			return fm;
-		case LONG:
-			LongMapValue lm = new LongMapValue();
-			for (Map.Entry<CharSequence, ?> entry : avroMap.entrySet()) {
-				lm.put(new StringValue((CharSequence) entry.getKey()), new LongValue((Long) entry.getValue()));
-			}
-			return lm;
-
-		default:
-			throw new RuntimeException("Map values of type " + mapValueType + " are not supported for Avro map.");
-		}
-	}
-
-	private StringValue sString = new StringValue();
-	private IntValue sInt = new IntValue();
-	private BooleanValue sBool = new BooleanValue();
-	private DoubleValue sDouble = new DoubleValue();
-	private FloatValue sFloat = new FloatValue();
-	private LongValue sLong = new LongValue();
-	
-	private final Value convertAvroPrimitiveToValue(Type type, Object avroRecord) {
-		switch (type) {
-		case STRING:
-			sString.setValue((CharSequence) avroRecord);
-			return sString;
-		case INT:
-			sInt.setValue((Integer) avroRecord);
-			return sInt;
-		case BOOLEAN:
-			sBool.setValue((Boolean) avroRecord);
-			return sBool;
-		case DOUBLE:
-			sDouble.setValue((Double) avroRecord);
-			return sDouble;
-		case FLOAT:
-			sFloat.setValue((Float) avroRecord);
-			return sFloat;
-		case LONG:
-			sLong.setValue((Long) avroRecord);
-			return sLong;
-		case NULL:
-			return NullValue.getInstance();
-		default:
-			throw new RuntimeException(
-					"Type "
-							+ type
-							+ " for AvroInputFormat is not implemented. Open an issue on GitHub.");
-		}
-	}
-
-	private final Type checkTypeConstraintsAndGetType(final Schema schema) {
-		final Type type = schema.getType();
-		if (type == Type.RECORD) {
-			throw new RuntimeException("The given Avro file contains complex data types which are not supported right now");
-		}
-
-		if (type == Type.UNION) {
-			List<Schema> types = schema.getTypes();
-			if (types.size() > 2) {
-				throw new RuntimeException("The given Avro file contains a union that has more than two elements");
-			}
-			if (types.size() == 1 && types.get(0).getType() != Type.UNION) {
-				return types.get(0).getType();
-			}
-			if (types.get(0).getType() == Type.UNION || types.get(1).getType() == Type.UNION) {
-				throw new RuntimeException("The given Avro file contains a nested union");
-			}
-			if (types.get(0).getType() == Type.NULL) {
-				return types.get(1).getType();
-			} else {
-				if (types.get(1).getType() != Type.NULL) {
-					throw new RuntimeException("The given Avro file is contains a union with two non-null types.");
-				}
-				return types.get(0).getType();
-			}
-		}
-		return type;
-	}
-
-	/**
-	 * Set minNumSplits to number of files.
-	 */
-	@Override
-	public FileInputSplit[] createInputSplits(int minNumSplits) throws IOException {
-		int numAvroFiles = 0;
-		final Path path = this.filePath;
-		// get all the files that are involved in the splits
-		final FileSystem fs = path.getFileSystem();
-		final FileStatus pathFile = fs.getFileStatus(path);
-
-		if (!acceptFile(pathFile)) {
-			throw new IOException("The given file does not pass the file-filter");
-		}
-		if (pathFile.isDir()) {
-			// input is directory. list all contained files
-			final FileStatus[] dir = fs.listStatus(path);
-			for (int i = 0; i < dir.length; i++) {
-				if (!dir[i].isDir() && acceptFile(dir[i])) {
-					numAvroFiles++;
-				}
-			}
-		} else {
-			numAvroFiles = 1;
-		}
-		return super.createInputSplits(numAvroFiles);
-	}
-
-	// --------------------------------------------------------------------------------------------
-	// Concrete subclasses of ListValue and MapValue for all possible primitive types
-	// --------------------------------------------------------------------------------------------
-
-	public static class StringListValue extends ListValue<StringValue> {
-		private static final long serialVersionUID = 1L;
-	}
-
-	public static class IntListValue extends ListValue<IntValue> {
-		private static final long serialVersionUID = 1L;
-	}
-
-	public static class BooleanListValue extends ListValue<BooleanValue> {
-		private static final long serialVersionUID = 1L;
-	}
-
-	public static class DoubleListValue extends ListValue<DoubleValue> {
-		private static final long serialVersionUID = 1L;
-	}
-
-	public static class FloatListValue extends ListValue<FloatValue> {
-		private static final long serialVersionUID = 1L;
-	}
-
-	public static class LongListValue extends ListValue<LongValue> {
-		private static final long serialVersionUID = 1L;
-	}
-
-	public static class StringMapValue extends MapValue<StringValue, StringValue> {
-		private static final long serialVersionUID = 1L;
-	}
-
-	public static class IntMapValue extends MapValue<StringValue, IntValue> {
-		private static final long serialVersionUID = 1L;
-	}
-
-	public static class BooleanMapValue extends MapValue<StringValue, BooleanValue> {
-		private static final long serialVersionUID = 1L;
-	}
-
-	public static class DoubleMapValue extends MapValue<StringValue, DoubleValue> {
-		private static final long serialVersionUID = 1L;
-	}
-
-	public static class FloatMapValue extends MapValue<StringValue, FloatValue> {
-		private static final long serialVersionUID = 1L;
-	}
-
-	public static class LongMapValue extends MapValue<StringValue, LongValue> {
-		private static final long serialVersionUID = 1L;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/avro/src/main/java/org/apache/flink/api/java/record/io/avro/example/ReflectiveAvroTypeExample.java
----------------------------------------------------------------------
diff --git a/flink-addons/avro/src/main/java/org/apache/flink/api/java/record/io/avro/example/ReflectiveAvroTypeExample.java b/flink-addons/avro/src/main/java/org/apache/flink/api/java/record/io/avro/example/ReflectiveAvroTypeExample.java
deleted file mode 100644
index 9cdaef0..0000000
--- a/flink-addons/avro/src/main/java/org/apache/flink/api/java/record/io/avro/example/ReflectiveAvroTypeExample.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.record.io.avro.example;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Iterator;
-import java.util.Random;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.io.OutputFormat;
-import org.apache.flink.api.java.record.functions.MapFunction;
-import org.apache.flink.api.java.record.functions.ReduceFunction;
-import org.apache.flink.api.java.record.io.GenericInputFormat;
-import org.apache.flink.api.java.record.operators.GenericDataSink;
-import org.apache.flink.api.java.record.operators.GenericDataSource;
-import org.apache.flink.api.java.record.operators.MapOperator;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
-import org.apache.flink.client.LocalExecutor;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
-import org.apache.flink.util.Collector;
-
-
-public class ReflectiveAvroTypeExample {
-	
-	
-	public static void main(String[] args) throws Exception {
-		
-		GenericDataSource<UserGeneratingInputFormat> source = new GenericDataSource<UserGeneratingInputFormat>(UserGeneratingInputFormat.class);
-		
-		MapOperator mapper = MapOperator.builder(new NumberExtractingMapper())
-				.input(source).name("le mapper").build();
-		
-		ReduceOperator reducer = ReduceOperator.builder(new ConcatenatingReducer(), IntValue.class, 1)
-				.input(mapper).name("le reducer").build();
-		
-		GenericDataSink sink = new GenericDataSink(PrintingOutputFormat.class, reducer);
-		
-		Plan p = new Plan(sink);
-		p.setDefaultParallelism(4);
-		
-		LocalExecutor.execute(p);
-	}
-	
-	
-	public static final class NumberExtractingMapper extends MapFunction implements Serializable {
-		private static final long serialVersionUID = 1L;
-		
-		@Override
-		public void map(Record record, Collector<Record> out) throws Exception {
-			User u = record.getField(0, SUser.class).datum();
-			record.setField(1, new IntValue(u.getFavoriteNumber()));
-			out.collect(record);
-		}
-	}
-	
-	
-	public static final class ConcatenatingReducer extends ReduceFunction implements Serializable {
-		private static final long serialVersionUID = 1L;
-		
-		private final Record result = new Record(2);
-
-		@Override
-		public void reduce(Iterator<Record> records, Collector<Record> out) throws Exception {
-			Record r = records.next();
-			
-			int num = r.getField(1, IntValue.class).getValue();
-			String names = r.getField(0, SUser.class).datum().getFavoriteColor().toString();
-			
-			while (records.hasNext()) {
-				r = records.next();
-				names += " - " + r.getField(0, SUser.class).datum().getFavoriteColor().toString();
-			}
-			
-			result.setField(0, new IntValue(num));
-			result.setField(1,  new StringValue(names));
-			out.collect(result);
-		}
-
-	}
-	
-	
-	public static final class UserGeneratingInputFormat extends GenericInputFormat {
-
-		private static final long serialVersionUID = 1L;
-		
-		private static final int NUM = 100;
-		
-		private final Random rnd = new Random(32498562304986L);
-		
-		private static final String[] NAMES = { "Peter", "Bob", "Liddy", "Alexander", "Stan" };
-		
-		private static final String[] COLORS = { "mauve", "crimson", "copper", "sky", "grass" };
-		
-		private int count;
-		
-
-		@Override
-		public boolean reachedEnd() throws IOException {
-			return count >= NUM;
-		}
-
-		@Override
-		public Record nextRecord(Record record) throws IOException {
-			count++;
-			
-			User u = new User();
-			u.setName(NAMES[rnd.nextInt(NAMES.length)]);
-			u.setFavoriteColor(COLORS[rnd.nextInt(COLORS.length)]);
-			u.setFavoriteNumber(rnd.nextInt(87));
-			
-			SUser su = new SUser();
-			su.datum(u);
-			
-			record.setField(0, su);
-			return record;
-		}
-	}
-	
-	public static final class PrintingOutputFormat implements OutputFormat<Record> {
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void configure(Configuration parameters) {}
-
-		@Override
-		public void open(int taskNumber, int numTasks) throws IOException {}
-
-		@Override
-		public void writeRecord(Record record) throws IOException {
-			int color = record.getField(0, IntValue.class).getValue();
-			String names = record.getField(1, StringValue.class).getValue();
-			
-			System.out.println(color + ": " + names);
-		}
-		
-		@Override
-		public void close() throws IOException {}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/avro/src/main/java/org/apache/flink/api/java/record/io/avro/example/SUser.java
----------------------------------------------------------------------
diff --git a/flink-addons/avro/src/main/java/org/apache/flink/api/java/record/io/avro/example/SUser.java b/flink-addons/avro/src/main/java/org/apache/flink/api/java/record/io/avro/example/SUser.java
deleted file mode 100644
index 2fdfc05..0000000
--- a/flink-addons/avro/src/main/java/org/apache/flink/api/java/record/io/avro/example/SUser.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.record.io.avro.example;
-
-import org.apache.flink.api.avro.AvroBaseValue;
-
-public class SUser extends AvroBaseValue<User> {
-	private static final long serialVersionUID = 1L;
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/avro/src/main/java/org/apache/flink/api/java/record/io/avro/example/User.java
----------------------------------------------------------------------
diff --git a/flink-addons/avro/src/main/java/org/apache/flink/api/java/record/io/avro/example/User.java b/flink-addons/avro/src/main/java/org/apache/flink/api/java/record/io/avro/example/User.java
deleted file mode 100644
index 7f48775..0000000
--- a/flink-addons/avro/src/main/java/org/apache/flink/api/java/record/io/avro/example/User.java
+++ /dev/null
@@ -1,269 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-/**
- * Autogenerated by Avro
- * 
- * DO NOT EDIT DIRECTLY
- */
-package org.apache.flink.api.java.record.io.avro.example;  
-@SuppressWarnings("all")
-@org.apache.avro.specific.AvroGenerated
-public class User extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
-  public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"org.apache.flink.api.avro.example\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"favorite_number\",\"type\":[\"int\",\"null\"]},{\"name\":\"favorite_color\",\"type\":[\"string\",\"null\"]}]}");
-  public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
-  @Deprecated public java.lang.CharSequence name;
-  @Deprecated public java.lang.Integer favorite_number;
-  @Deprecated public java.lang.CharSequence favorite_color;
-
-  /**
-   * Default constructor.  Note that this does not initialize fields
-   * to their default values from the schema.  If that is desired then
-   * one should use {@link \#newBuilder()}. 
-   */
-  public User() {}
-
-  /**
-   * All-args constructor.
-   */
-  public User(java.lang.CharSequence name, java.lang.Integer favorite_number, java.lang.CharSequence favorite_color) {
-    this.name = name;
-    this.favorite_number = favorite_number;
-    this.favorite_color = favorite_color;
-  }
-
-  public org.apache.avro.Schema getSchema() { return SCHEMA$; }
-  // Used by DatumWriter.  Applications should not call. 
-  public java.lang.Object get(int field$) {
-    switch (field$) {
-    case 0: return name;
-    case 1: return favorite_number;
-    case 2: return favorite_color;
-    default: throw new org.apache.avro.AvroRuntimeException("Bad index");
-    }
-  }
-  // Used by DatumReader.  Applications should not call. 
-  @SuppressWarnings(value="unchecked")
-  public void put(int field$, java.lang.Object value$) {
-    switch (field$) {
-    case 0: name = (java.lang.CharSequence)value$; break;
-    case 1: favorite_number = (java.lang.Integer)value$; break;
-    case 2: favorite_color = (java.lang.CharSequence)value$; break;
-    default: throw new org.apache.avro.AvroRuntimeException("Bad index");
-    }
-  }
-
-  /**
-   * Gets the value of the 'name' field.
-   */
-  public java.lang.CharSequence getName() {
-    return name;
-  }
-
-  /**
-   * Sets the value of the 'name' field.
-   * @param value the value to set.
-   */
-  public void setName(java.lang.CharSequence value) {
-    this.name = value;
-  }
-
-  /**
-   * Gets the value of the 'favorite_number' field.
-   */
-  public java.lang.Integer getFavoriteNumber() {
-    return favorite_number;
-  }
-
-  /**
-   * Sets the value of the 'favorite_number' field.
-   * @param value the value to set.
-   */
-  public void setFavoriteNumber(java.lang.Integer value) {
-    this.favorite_number = value;
-  }
-
-  /**
-   * Gets the value of the 'favorite_color' field.
-   */
-  public java.lang.CharSequence getFavoriteColor() {
-    return favorite_color;
-  }
-
-  /**
-   * Sets the value of the 'favorite_color' field.
-   * @param value the value to set.
-   */
-  public void setFavoriteColor(java.lang.CharSequence value) {
-    this.favorite_color = value;
-  }
-
-  /** Creates a new User RecordBuilder */
-  public static org.apache.flink.api.java.record.io.avro.example.User.Builder newBuilder() {
-    return new org.apache.flink.api.java.record.io.avro.example.User.Builder();
-  }
-  
-  /** Creates a new User RecordBuilder by copying an existing Builder */
-  public static org.apache.flink.api.java.record.io.avro.example.User.Builder newBuilder(org.apache.flink.api.java.record.io.avro.example.User.Builder other) {
-    return new org.apache.flink.api.java.record.io.avro.example.User.Builder(other);
-  }
-  
-  /** Creates a new User RecordBuilder by copying an existing User instance */
-  public static org.apache.flink.api.java.record.io.avro.example.User.Builder newBuilder(org.apache.flink.api.java.record.io.avro.example.User other) {
-    return new org.apache.flink.api.java.record.io.avro.example.User.Builder(other);
-  }
-  
-  /**
-   * RecordBuilder for User instances.
-   */
-  public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase<User>
-    implements org.apache.avro.data.RecordBuilder<User> {
-
-    private java.lang.CharSequence name;
-    private java.lang.Integer favorite_number;
-    private java.lang.CharSequence favorite_color;
-
-    /** Creates a new Builder */
-    private Builder() {
-      super(org.apache.flink.api.java.record.io.avro.example.User.SCHEMA$);
-    }
-    
-    /** Creates a Builder by copying an existing Builder */
-    private Builder(org.apache.flink.api.java.record.io.avro.example.User.Builder other) {
-      super(other);
-      if (isValidValue(fields()[0], other.name)) {
-        this.name = data().deepCopy(fields()[0].schema(), other.name);
-        fieldSetFlags()[0] = true;
-      }
-      if (isValidValue(fields()[1], other.favorite_number)) {
-        this.favorite_number = data().deepCopy(fields()[1].schema(), other.favorite_number);
-        fieldSetFlags()[1] = true;
-      }
-      if (isValidValue(fields()[2], other.favorite_color)) {
-        this.favorite_color = data().deepCopy(fields()[2].schema(), other.favorite_color);
-        fieldSetFlags()[2] = true;
-      }
-    }
-    
-    /** Creates a Builder by copying an existing User instance */
-    private Builder(org.apache.flink.api.java.record.io.avro.example.User other) {
-            super(org.apache.flink.api.java.record.io.avro.example.User.SCHEMA$);
-      if (isValidValue(fields()[0], other.name)) {
-        this.name = data().deepCopy(fields()[0].schema(), other.name);
-        fieldSetFlags()[0] = true;
-      }
-      if (isValidValue(fields()[1], other.favorite_number)) {
-        this.favorite_number = data().deepCopy(fields()[1].schema(), other.favorite_number);
-        fieldSetFlags()[1] = true;
-      }
-      if (isValidValue(fields()[2], other.favorite_color)) {
-        this.favorite_color = data().deepCopy(fields()[2].schema(), other.favorite_color);
-        fieldSetFlags()[2] = true;
-      }
-    }
-
-    /** Gets the value of the 'name' field */
-    public java.lang.CharSequence getName() {
-      return name;
-    }
-    
-    /** Sets the value of the 'name' field */
-    public org.apache.flink.api.java.record.io.avro.example.User.Builder setName(java.lang.CharSequence value) {
-      validate(fields()[0], value);
-      this.name = value;
-      fieldSetFlags()[0] = true;
-      return this; 
-    }
-    
-    /** Checks whether the 'name' field has been set */
-    public boolean hasName() {
-      return fieldSetFlags()[0];
-    }
-    
-    /** Clears the value of the 'name' field */
-    public org.apache.flink.api.java.record.io.avro.example.User.Builder clearName() {
-      name = null;
-      fieldSetFlags()[0] = false;
-      return this;
-    }
-
-    /** Gets the value of the 'favorite_number' field */
-    public java.lang.Integer getFavoriteNumber() {
-      return favorite_number;
-    }
-    
-    /** Sets the value of the 'favorite_number' field */
-    public org.apache.flink.api.java.record.io.avro.example.User.Builder setFavoriteNumber(java.lang.Integer value) {
-      validate(fields()[1], value);
-      this.favorite_number = value;
-      fieldSetFlags()[1] = true;
-      return this; 
-    }
-    
-    /** Checks whether the 'favorite_number' field has been set */
-    public boolean hasFavoriteNumber() {
-      return fieldSetFlags()[1];
-    }
-    
-    /** Clears the value of the 'favorite_number' field */
-    public org.apache.flink.api.java.record.io.avro.example.User.Builder clearFavoriteNumber() {
-      favorite_number = null;
-      fieldSetFlags()[1] = false;
-      return this;
-    }
-
-    /** Gets the value of the 'favorite_color' field */
-    public java.lang.CharSequence getFavoriteColor() {
-      return favorite_color;
-    }
-    
-    /** Sets the value of the 'favorite_color' field */
-    public org.apache.flink.api.java.record.io.avro.example.User.Builder setFavoriteColor(java.lang.CharSequence value) {
-      validate(fields()[2], value);
-      this.favorite_color = value;
-      fieldSetFlags()[2] = true;
-      return this; 
-    }
-    
-    /** Checks whether the 'favorite_color' field has been set */
-    public boolean hasFavoriteColor() {
-      return fieldSetFlags()[2];
-    }
-    
-    /** Clears the value of the 'favorite_color' field */
-    public org.apache.flink.api.java.record.io.avro.example.User.Builder clearFavoriteColor() {
-      favorite_color = null;
-      fieldSetFlags()[2] = false;
-      return this;
-    }
-
-    @Override
-    public User build() {
-      try {
-        User record = new User();
-        record.name = fieldSetFlags()[0] ? this.name : (java.lang.CharSequence) defaultValue(fields()[0]);
-        record.favorite_number = fieldSetFlags()[1] ? this.favorite_number : (java.lang.Integer) defaultValue(fields()[1]);
-        record.favorite_color = fieldSetFlags()[2] ? this.favorite_color : (java.lang.CharSequence) defaultValue(fields()[2]);
-        return record;
-      } catch (Exception e) {
-        throw new org.apache.avro.AvroRuntimeException(e);
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/avro/src/test/assembly/test-assembly.xml
----------------------------------------------------------------------
diff --git a/flink-addons/avro/src/test/assembly/test-assembly.xml b/flink-addons/avro/src/test/assembly/test-assembly.xml
deleted file mode 100644
index 8316581..0000000
--- a/flink-addons/avro/src/test/assembly/test-assembly.xml
+++ /dev/null
@@ -1,31 +0,0 @@
-<!--
-  Licensed under the Apache License, Version 2.0 (the "License");
-  you may not use this file except in compliance with the License.
-  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License. See accompanying LICENSE file.
--->
-
-<assembly>
-	<id>test-jar</id>
-	<formats>
-		<format>jar</format>
-	</formats>
-	<includeBaseDirectory>false</includeBaseDirectory>
-	<fileSets>
-		<fileSet>
-			<directory>${project.build.testOutputDirectory}</directory>
-			<outputDirectory>/</outputDirectory>
-			<!--modify/add include to match your package(s) -->
-			<includes>
-				<include>org/apache/flink/api/avro/testjar/**</include>
-			</includes>
-		</fileSet>
-	</fileSets>
-</assembly>
\ No newline at end of file