You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/02 13:35:39 UTC

[49/51] [abbrv] [partial] flink git commit: [FLINK-4676] [connectors] Merge batch and streaming connectors into common Maven module.

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-avro/src/test/resources/avro/user.avsc
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-avro/src/test/resources/avro/user.avsc b/flink-batch-connectors/flink-avro/src/test/resources/avro/user.avsc
deleted file mode 100644
index 02c11af..0000000
--- a/flink-batch-connectors/flink-avro/src/test/resources/avro/user.avsc
+++ /dev/null
@@ -1,35 +0,0 @@
-[
-{"namespace": "org.apache.flink.api.io.avro.generated",
- "type": "record",
- "name": "Address",
- "fields": [
-     {"name": "num", "type": "int"},
-     {"name": "street", "type": "string"},
-     {"name": "city", "type": "string"},
-     {"name": "state", "type": "string"},
-     {"name": "zip", "type": "string"}
-  ]
-},
-{"namespace": "org.apache.flink.api.io.avro.generated",
- "type": "record",
- "name": "User",
- "fields": [
-     {"name": "name", "type": "string"},
-     {"name": "favorite_number",  "type": ["int", "null"]},
-     {"name": "favorite_color", "type": ["string", "null"]},
-     {"name": "type_long_test", "type": ["long", "null"]},
-     {"name": "type_double_test", "type": "double"},
-     {"name": "type_null_test", "type": ["null"]},
-     {"name": "type_bool_test", "type": ["boolean"]},
-     {"name": "type_array_string", "type" : {"type" : "array", "items" : "string"}},  
-     {"name": "type_array_boolean", "type" : {"type" : "array", "items" : "boolean"}}, 
-     {"name": "type_nullable_array", "type": ["null", {"type":"array", "items":"string"}], "default":null},
-     {"name": "type_enum", "type": {"type": "enum", "name": "Colors", "symbols" : ["RED", "GREEN", "BLUE"]}},
-     {"name": "type_map", "type": {"type": "map", "values": "long"}},
-     {"name": "type_fixed",
-                 "size": 16,
-                 "type": ["null", {"name": "Fixed16", "size": 16, "type": "fixed"}] },
-     {"name": "type_union", "type": ["null", "boolean", "long", "double"]},
-     {"name": "type_nested", "type": ["null", "Address"]}
- ]
-}]

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-avro/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-avro/src/test/resources/log4j-test.properties b/flink-batch-connectors/flink-avro/src/test/resources/log4j-test.properties
deleted file mode 100644
index 0b686e5..0000000
--- a/flink-batch-connectors/flink-avro/src/test/resources/log4j-test.properties
+++ /dev/null
@@ -1,27 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-# Set root logger level to DEBUG and its only appender to A1.
-log4j.rootLogger=OFF, A1
-
-# A1 is set to be a ConsoleAppender.
-log4j.appender.A1=org.apache.log4j.ConsoleAppender
-
-# A1 uses PatternLayout.
-log4j.appender.A1.layout=org.apache.log4j.PatternLayout
-log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-avro/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-avro/src/test/resources/logback-test.xml b/flink-batch-connectors/flink-avro/src/test/resources/logback-test.xml
deleted file mode 100644
index 8b3bb27..0000000
--- a/flink-batch-connectors/flink-avro/src/test/resources/logback-test.xml
+++ /dev/null
@@ -1,29 +0,0 @@
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one
-  ~ or more contributor license agreements.  See the NOTICE file
-  ~ distributed with this work for additional information
-  ~ regarding copyright ownership.  The ASF licenses this file
-  ~ to you under the Apache License, Version 2.0 (the
-  ~ "License"); you may not use this file except in compliance
-  ~ with the License.  You may obtain a copy of the License at
-  ~
-  ~     http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
-  -->
-
-<configuration>
-    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
-        <encoder>
-            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
-        </encoder>
-    </appender>
-
-    <root level="WARN">
-        <appender-ref ref="STDOUT"/>
-    </root>
-</configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-avro/src/test/resources/testdata.avro
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-avro/src/test/resources/testdata.avro b/flink-batch-connectors/flink-avro/src/test/resources/testdata.avro
deleted file mode 100644
index 45308b9..0000000
Binary files a/flink-batch-connectors/flink-avro/src/test/resources/testdata.avro and /dev/null differ

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hadoop-compatibility/pom.xml
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-hadoop-compatibility/pom.xml b/flink-batch-connectors/flink-hadoop-compatibility/pom.xml
deleted file mode 100644
index 8f423d9..0000000
--- a/flink-batch-connectors/flink-hadoop-compatibility/pom.xml
+++ /dev/null
@@ -1,182 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-	
-	<modelVersion>4.0.0</modelVersion>
-	
-	<parent>
-		<groupId>org.apache.flink</groupId>
-		<artifactId>flink-batch-connectors</artifactId>
-		<version>1.2-SNAPSHOT</version>
-		<relativePath>..</relativePath>
-	</parent>
-
-	<artifactId>flink-hadoop-compatibility_2.10</artifactId>
-	<name>flink-hadoop-compatibility</name>
-
-	<packaging>jar</packaging>
-
-	<dependencies>
-
-		<!-- core dependencies -->
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-java</artifactId>
-			<version>${project.version}</version>
-			<scope>provided</scope>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-scala_2.10</artifactId>
-			<version>${project.version}</version>
-			<scope>provided</scope>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-java</artifactId>
-			<version>${project.version}</version>
-			<type>test-jar</type>
-			<scope>test</scope>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-shaded-hadoop2</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-
-		<!-- test dependencies -->
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-test-utils_2.10</artifactId>
-			<version>${project.version}</version>
-			<scope>test</scope>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-core</artifactId>
-			<version>${project.version}</version>
-			<scope>test</scope>
-			<type>test-jar</type>
-		</dependency>
-		
-	</dependencies>
-
-
-	<build>
-		<plugins>
-			<!-- activate API compatibility checks -->
-			<plugin>
-				<groupId>com.github.siom79.japicmp</groupId>
-				<artifactId>japicmp-maven-plugin</artifactId>
-			</plugin>
-			<!-- Scala Compiler -->
-			<plugin>
-				<groupId>net.alchim31.maven</groupId>
-				<artifactId>scala-maven-plugin</artifactId>
-				<version>3.1.4</version>
-				<executions>
-					<!-- Run scala compiler in the process-resources phase, so that dependencies on
-						scala classes can be resolved later in the (Java) compile phase -->
-					<execution>
-						<id>scala-compile-first</id>
-						<phase>process-resources</phase>
-						<goals>
-							<goal>compile</goal>
-						</goals>
-					</execution>
-				</executions>
-				<configuration>
-					<jvmArgs>
-						<jvmArg>-Xms128m</jvmArg>
-						<jvmArg>-Xmx512m</jvmArg>
-					</jvmArgs>
-				</configuration>
-			</plugin>
-
-			<!-- Eclipse Integration -->
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-eclipse-plugin</artifactId>
-				<version>2.8</version>
-				<configuration>
-					<downloadSources>true</downloadSources>
-					<projectnatures>
-						<projectnature>org.scala-ide.sdt.core.scalanature</projectnature>
-						<projectnature>org.eclipse.jdt.core.javanature</projectnature>
-					</projectnatures>
-					<buildcommands>
-						<buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand>
-					</buildcommands>
-					<classpathContainers>
-						<classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer>
-						<classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
-					</classpathContainers>
-					<excludes>
-						<exclude>org.scala-lang:scala-library</exclude>
-						<exclude>org.scala-lang:scala-compiler</exclude>
-					</excludes>
-					<sourceIncludes>
-						<sourceInclude>**/*.scala</sourceInclude>
-						<sourceInclude>**/*.java</sourceInclude>
-					</sourceIncludes>
-				</configuration>
-			</plugin>
-
-			<!-- Adding scala source directories to build path -->
-			<plugin>
-				<groupId>org.codehaus.mojo</groupId>
-				<artifactId>build-helper-maven-plugin</artifactId>
-				<version>1.7</version>
-				<executions>
-					<!-- Add src/main/scala to eclipse build path -->
-					<execution>
-						<id>add-source</id>
-						<phase>generate-sources</phase>
-						<goals>
-							<goal>add-source</goal>
-						</goals>
-						<configuration>
-							<sources>
-								<source>src/main/scala</source>
-							</sources>
-						</configuration>
-					</execution>
-				</executions>
-			</plugin>
-
-			<!-- Scala Code Style, most of the configuration done via plugin management -->
-			<plugin>
-				<groupId>org.scalastyle</groupId>
-				<artifactId>scalastyle-maven-plugin</artifactId>
-				<configuration>
-					<configLocation>${project.basedir}/../../tools/maven/scalastyle-config.xml</configLocation>
-				</configuration>
-			</plugin>
-		</plugins>
-	</build>
-
-</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java b/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java
deleted file mode 100644
index 7bcb4bf..0000000
--- a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils;
-
-import org.apache.flink.annotation.Public;
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.InvalidTypesException;
-import org.apache.flink.api.common.typeinfo.AtomicType;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.typeutils.runtime.WritableComparator;
-import org.apache.flink.api.java.typeutils.runtime.WritableSerializer;
-import org.apache.hadoop.io.Writable;
-
-import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * Type information for data types that extend Hadoop's {@link Writable} interface. The Writable
- * interface defines the serialization and deserialization routines for the data type.
- *
- * @param <T> The type of the class represented by this type information.
- */
-@Public
-public class WritableTypeInfo<T extends Writable> extends TypeInformation<T> implements AtomicType<T> {
-	
-	private static final long serialVersionUID = 1L;
-	
-	private final Class<T> typeClass;
-
-	@PublicEvolving
-	public WritableTypeInfo(Class<T> typeClass) {
-		this.typeClass = checkNotNull(typeClass);
-
-		checkArgument(
-			Writable.class.isAssignableFrom(typeClass) && !typeClass.equals(Writable.class),
-			"WritableTypeInfo can only be used for subclasses of %s", Writable.class.getName());
-	}
-
-	@SuppressWarnings({ "rawtypes", "unchecked" })
-	@Override
-	@PublicEvolving
-	public TypeComparator<T> createComparator(boolean sortOrderAscending, ExecutionConfig executionConfig) {
-		if(Comparable.class.isAssignableFrom(typeClass)) {
-			return new WritableComparator(sortOrderAscending, typeClass);
-		}
-		else {
-			throw new UnsupportedOperationException("Cannot create Comparator for "+typeClass.getCanonicalName()+". " +
-													"Class does not implement Comparable interface.");
-		}
-	}
-
-	@Override
-	@PublicEvolving
-	public boolean isBasicType() {
-		return false;
-	}
-
-	@Override
-	@PublicEvolving
-	public boolean isTupleType() {
-		return false;
-	}
-
-	@Override
-	@PublicEvolving
-	public int getArity() {
-		return 1;
-	}
-	
-	@Override
-	@PublicEvolving
-	public int getTotalFields() {
-		return 1;
-	}
-
-	@Override
-	@PublicEvolving
-	public Class<T> getTypeClass() {
-		return this.typeClass;
-	}
-
-	@Override
-	@PublicEvolving
-	public boolean isKeyType() {
-		return Comparable.class.isAssignableFrom(typeClass);
-	}
-
-	@Override
-	@PublicEvolving
-	public TypeSerializer<T> createSerializer(ExecutionConfig executionConfig) {
-		return new WritableSerializer<T>(typeClass);
-	}
-	
-	@Override
-	public String toString() {
-		return "WritableType<" + typeClass.getName() + ">";
-	}	
-	
-	@Override
-	public int hashCode() {
-		return typeClass.hashCode();
-	}
-	
-	@Override
-	public boolean equals(Object obj) {
-		if (obj instanceof WritableTypeInfo) {
-			@SuppressWarnings("unchecked")
-			WritableTypeInfo<T> writableTypeInfo = (WritableTypeInfo<T>) obj;
-
-			return writableTypeInfo.canEqual(this) &&
-				typeClass == writableTypeInfo.typeClass;
-
-		} else {
-			return false;
-		}
-	}
-
-	@Override
-	public boolean canEqual(Object obj) {
-		return obj instanceof WritableTypeInfo;
-	}
-	
-	// --------------------------------------------------------------------------------------------
-
-	@PublicEvolving
-	static <T extends Writable> TypeInformation<T> getWritableTypeInfo(Class<T> typeClass) {
-		if (Writable.class.isAssignableFrom(typeClass) && !typeClass.equals(Writable.class)) {
-			return new WritableTypeInfo<T>(typeClass);
-		}
-		else {
-			throw new InvalidTypesException("The given class is no subclass of " + Writable.class.getName());
-		}
-	}
-	
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableComparator.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableComparator.java b/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableComparator.java
deleted file mode 100644
index 3a95d94..0000000
--- a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableComparator.java
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import com.esotericsoftware.kryo.Kryo;
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.types.NormalizableKey;
-import org.apache.flink.util.InstantiationUtil;
-import org.apache.hadoop.io.Writable;
-import org.objenesis.strategy.StdInstantiatorStrategy;
-
-import java.io.IOException;
-
-public class WritableComparator<T extends Writable & Comparable<T>> extends TypeComparator<T> {
-	
-	private static final long serialVersionUID = 1L;
-	
-	private Class<T> type;
-	
-	private final boolean ascendingComparison;
-	
-	private transient T reference;
-	
-	private transient T tempReference;
-	
-	private transient Kryo kryo;
-
-	@SuppressWarnings("rawtypes")
-	private final TypeComparator[] comparators = new TypeComparator[] {this};
-
-	public WritableComparator(boolean ascending, Class<T> type) {
-		this.type = type;
-		this.ascendingComparison = ascending;
-	}
-	
-	@Override
-	public int hash(T record) {
-		return record.hashCode();
-	}
-	
-	@Override
-	public void setReference(T toCompare) {
-		checkKryoInitialized();
-
-		reference = KryoUtils.copy(toCompare, kryo, new WritableSerializer<T>(type));
-	}
-	
-	@Override
-	public boolean equalToReference(T candidate) {
-		return candidate.equals(reference);
-	}
-	
-	@Override
-	public int compareToReference(TypeComparator<T> referencedComparator) {
-		T otherRef = ((WritableComparator<T>) referencedComparator).reference;
-		int comp = otherRef.compareTo(reference);
-		return ascendingComparison ? comp : -comp;
-	}
-	
-	@Override
-	public int compare(T first, T second) {
-		int comp = first.compareTo(second);
-		return ascendingComparison ? comp : -comp;
-	}
-	
-	@Override
-	public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
-		ensureReferenceInstantiated();
-		ensureTempReferenceInstantiated();
-		
-		reference.readFields(firstSource);
-		tempReference.readFields(secondSource);
-		
-		int comp = reference.compareTo(tempReference);
-		return ascendingComparison ? comp : -comp;
-	}
-	
-	@Override
-	public boolean supportsNormalizedKey() {
-		return NormalizableKey.class.isAssignableFrom(type);
-	}
-	
-	@Override
-	public int getNormalizeKeyLen() {
-		ensureReferenceInstantiated();
-		
-		NormalizableKey<?> key = (NormalizableKey<?>) reference;
-		return key.getMaxNormalizedKeyLen();
-	}
-	
-	@Override
-	public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
-		return keyBytes < getNormalizeKeyLen();
-	}
-	
-	@Override
-	public void putNormalizedKey(T record, MemorySegment target, int offset, int numBytes) {
-		NormalizableKey<?> key = (NormalizableKey<?>) record;
-		key.copyNormalizedKey(target, offset, numBytes);
-	}
-	
-	@Override
-	public boolean invertNormalizedKey() {
-		return !ascendingComparison;
-	}
-	
-	@Override
-	public TypeComparator<T> duplicate() {
-		return new WritableComparator<T>(ascendingComparison, type);
-	}
-
-	@Override
-	public int extractKeys(Object record, Object[] target, int index) {
-		target[index] = record;
-		return 1;
-	}
-
-	@SuppressWarnings("rawtypes")
-	@Override
-	public TypeComparator[] getFlatComparators() {
-		return comparators;
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	// unsupported normalization
-	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	public boolean supportsSerializationWithKeyNormalization() {
-		return false;
-	}
-	
-	@Override
-	public void writeWithKeyNormalization(T record, DataOutputView target) throws IOException {
-		throw new UnsupportedOperationException();
-	}
-	
-	@Override
-	public T readWithKeyDenormalization(T reuse, DataInputView source) throws IOException {
-		throw new UnsupportedOperationException();
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	private void checkKryoInitialized() {
-		if (this.kryo == null) {
-			this.kryo = new Kryo();
-
-			Kryo.DefaultInstantiatorStrategy instantiatorStrategy = new Kryo.DefaultInstantiatorStrategy();
-			instantiatorStrategy.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy());
-			kryo.setInstantiatorStrategy(instantiatorStrategy);
-
-			this.kryo.setAsmEnabled(true);
-			this.kryo.register(type);
-		}
-	}
-	
-	private void ensureReferenceInstantiated() {
-		if (reference == null) {
-			reference = InstantiationUtil.instantiate(type, Writable.class);
-		}
-	}
-	
-	private void ensureTempReferenceInstantiated() {
-		if (tempReference == null) {
-			tempReference = InstantiationUtil.instantiate(type, Writable.class);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java b/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
deleted file mode 100644
index 9036d75..0000000
--- a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-
-import com.esotericsoftware.kryo.Kryo;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.util.InstantiationUtil;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Writable;
-import org.objenesis.strategy.StdInstantiatorStrategy;
-
-import java.io.IOException;
-
-public class WritableSerializer<T extends Writable> extends TypeSerializer<T> {
-	
-	private static final long serialVersionUID = 1L;
-	
-	private final Class<T> typeClass;
-	
-	private transient Kryo kryo;
-	
-	private transient T copyInstance;
-	
-	public WritableSerializer(Class<T> typeClass) {
-		this.typeClass = typeClass;
-	}
-	
-	@SuppressWarnings("unchecked")
-	@Override
-	public T createInstance() {
-		if(typeClass == NullWritable.class) {
-			return (T) NullWritable.get();
-		}
-		return InstantiationUtil.instantiate(typeClass);
-	}
-
-
-	
-	@Override
-	public T copy(T from) {
-		checkKryoInitialized();
-
-		return KryoUtils.copy(from, kryo, this);
-	}
-	
-	@Override
-	public T copy(T from, T reuse) {
-		checkKryoInitialized();
-
-		return KryoUtils.copy(from, reuse, kryo, this);
-	}
-	
-	@Override
-	public int getLength() {
-		return -1;
-	}
-	
-	@Override
-	public void serialize(T record, DataOutputView target) throws IOException {
-		record.write(target);
-	}
-	
-	@Override
-	public T deserialize(DataInputView source) throws IOException {
-		return deserialize(createInstance(), source);
-	}
-	
-	@Override
-	public T deserialize(T reuse, DataInputView source) throws IOException {
-		reuse.readFields(source);
-		return reuse;
-	}
-	
-	@Override
-	public void copy(DataInputView source, DataOutputView target) throws IOException {
-		ensureInstanceInstantiated();
-		copyInstance.readFields(source);
-		copyInstance.write(target);
-	}
-	
-	@Override
-	public boolean isImmutableType() {
-		return false;
-	}
-	
-	@Override
-	public WritableSerializer<T> duplicate() {
-		return new WritableSerializer<T>(typeClass);
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	private void ensureInstanceInstantiated() {
-		if (copyInstance == null) {
-			copyInstance = createInstance();
-		}
-	}
-	
-	private void checkKryoInitialized() {
-		if (this.kryo == null) {
-			this.kryo = new Kryo();
-
-			Kryo.DefaultInstantiatorStrategy instantiatorStrategy = new Kryo.DefaultInstantiatorStrategy();
-			instantiatorStrategy.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy());
-			kryo.setInstantiatorStrategy(instantiatorStrategy);
-
-			this.kryo.setAsmEnabled(true);
-			this.kryo.register(typeClass);
-		}
-	}
-	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	public int hashCode() {
-		return this.typeClass.hashCode();
-	}
-	
-	@Override
-	public boolean equals(Object obj) {
-		if (obj instanceof WritableSerializer) {
-			WritableSerializer<?> other = (WritableSerializer<?>) obj;
-
-			return other.canEqual(this) && typeClass == other.typeClass;
-		} else {
-			return false;
-		}
-	}
-
-	@Override
-	public boolean canEqual(Object obj) {
-		return obj instanceof WritableSerializer;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopInputs.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopInputs.java b/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopInputs.java
deleted file mode 100644
index 9e8a3e4..0000000
--- a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopInputs.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.hadoopcompatibility;
-
-import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapreduce.Job;
-
-import java.io.IOException;
-
-/**
- * HadoopInputs is a utility class to use Apache Hadoop InputFormats with Apache Flink.
- *
- * It provides methods to create Flink InputFormat wrappers for Hadoop {@link org.apache.hadoop.mapred.InputFormat}
- * and {@link org.apache.hadoop.mapreduce.InputFormat}.
- *
- * Key value pairs produced by the Hadoop InputFormats are converted into Flink
- * {@link org.apache.flink.api.java.tuple.Tuple2 Tuple2} objects where the first field
- * ({@link org.apache.flink.api.java.tuple.Tuple2#f0 Tuple2.f0}) is the key and the second field
- * ({@link org.apache.flink.api.java.tuple.Tuple2#f1 Tuple2.f1}) is the value.
- *
- */
-
-public final class HadoopInputs {
-	// ----------------------------------- Hadoop Input Format ---------------------------------------
-
-	/**
-	 * Creates a Flink {@link InputFormat} that wraps the given Hadoop {@link org.apache.hadoop.mapred.FileInputFormat}.
-	 *
-	 * @return A Flink InputFormat that wraps the Hadoop FileInputFormat.
-	 */
-	public static <K,V> HadoopInputFormat<K, V> readHadoopFile(org.apache.hadoop.mapred.FileInputFormat<K,V> mapredInputFormat, Class<K> key, Class<V> value, String inputPath, JobConf job) {
-		// set input path in JobConf
-		org.apache.hadoop.mapred.FileInputFormat.addInputPath(job, new org.apache.hadoop.fs.Path(inputPath));
-		// return wrapping InputFormat
-		return createHadoopInput(mapredInputFormat, key, value, job);
-	}
-
-	/**
-	 * Creates a Flink {@link InputFormat} that wraps the given Hadoop {@link org.apache.hadoop.mapred.FileInputFormat}.
-	 *
-	 * @return A Flink InputFormat that wraps the Hadoop FileInputFormat.
-	 */
-	public static <K,V> HadoopInputFormat<K, V> readHadoopFile(org.apache.hadoop.mapred.FileInputFormat<K,V> mapredInputFormat, Class<K> key, Class<V> value, String inputPath) {
-		return readHadoopFile(mapredInputFormat, key, value, inputPath, new JobConf());
-	}
-
-	/**
-	 * Creates a Flink {@link InputFormat} to read a Hadoop sequence file for the given key and value classes.
-	 *
-	 * @return A Flink InputFormat that wraps a Hadoop SequenceFileInputFormat.
-	 */
-	public static <K,V> HadoopInputFormat<K, V> readSequenceFile(Class<K> key, Class<V> value, String inputPath) throws IOException {
-		return readHadoopFile(new org.apache.hadoop.mapred.SequenceFileInputFormat<K, V>(), key, value, inputPath);
-	}
-
-	/**
-	 * Creates a Flink {@link InputFormat} that wraps the given Hadoop {@link org.apache.hadoop.mapred.InputFormat}.
-	 *
-	 * @return A Flink InputFormat that wraps the Hadoop InputFormat.
-	 */
-	public static <K,V> HadoopInputFormat<K, V> createHadoopInput(org.apache.hadoop.mapred.InputFormat<K,V> mapredInputFormat, Class<K> key, Class<V> value, JobConf job) {
-		return new HadoopInputFormat<>(mapredInputFormat, key, value, job);
-	}
-
-	/**
-	 * Creates a Flink {@link InputFormat} that wraps the given Hadoop {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat}.
-	 *
-	 * @return A Flink InputFormat that wraps the Hadoop FileInputFormat.
-	 */
-	public static <K,V> org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> readHadoopFile(
-			org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K,V> mapreduceInputFormat, Class<K> key, Class<V> value, String inputPath, Job job) throws IOException
-	{
-		// set input path in Job
-		org.apache.hadoop.mapreduce.lib.input.FileInputFormat.addInputPath(job, new org.apache.hadoop.fs.Path(inputPath));
-		// return wrapping InputFormat
-		return createHadoopInput(mapreduceInputFormat, key, value, job);
-	}
-
-	/**
-	 * Creates a Flink {@link InputFormat} that wraps the given Hadoop {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat}.
-	 *
-	 * @return A Flink InputFormat that wraps the Hadoop FileInputFormat.
-	 */
-	public static <K,V> org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> readHadoopFile(
-			org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K,V> mapreduceInputFormat, Class<K> key, Class<V> value, String inputPath) throws IOException
-	{
-		return readHadoopFile(mapreduceInputFormat, key, value, inputPath, Job.getInstance());
-	}
-
-	/**
-	 * Creates a Flink {@link InputFormat} that wraps the given Hadoop {@link org.apache.hadoop.mapreduce.InputFormat}.
-	 *
-	 * @return A Flink InputFormat that wraps the Hadoop InputFormat.
-	 */
-	public static <K,V> org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> createHadoopInput(
-			org.apache.hadoop.mapreduce.InputFormat<K,V> mapreduceInputFormat, Class<K> key, Class<V> value, Job job)
-	{
-		return new org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<>(mapreduceInputFormat, key, value, job);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopUtils.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopUtils.java b/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopUtils.java
deleted file mode 100644
index 97ca329..0000000
--- a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopUtils.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.hadoopcompatibility;
-
-import org.apache.commons.cli.Option;
-import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.hadoop.util.GenericOptionsParser;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Utility class to work with Apache Hadoop libraries.
- */
-public class HadoopUtils {
-	/**
-	 * Returns {@link ParameterTool} for the arguments parsed by {@link GenericOptionsParser}
-	 *
-	 * @param args Input array arguments. It should be parsable by {@link GenericOptionsParser}
-	 * @return A {@link ParameterTool}
-	 * @throws IOException If arguments cannot be parsed by {@link GenericOptionsParser}
-	 * @see GenericOptionsParser
-	 */
-	public static ParameterTool paramsFromGenericOptionsParser(String[] args) throws IOException {
-		Option[] options = new GenericOptionsParser(args).getCommandLine().getOptions();
-		Map<String, String> map = new HashMap<String, String>();
-		for (Option option : options) {
-			String[] split = option.getValue().split("=");
-			map.put(split[0], split[1]);
-		}
-		return ParameterTool.fromMap(map);
-	}
-}
-
-

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopMapFunction.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopMapFunction.java b/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopMapFunction.java
deleted file mode 100644
index ba8aa90..0000000
--- a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopMapFunction.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.hadoopcompatibility.mapred;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.Serializable;
-
-import org.apache.flink.annotation.Public;
-import org.apache.flink.api.common.functions.RichFlatMapFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter;
-import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopOutputCollector;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.InstantiationUtil;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.Reporter;
-
-/**
- * This wrapper maps a Hadoop Mapper (mapred API) to a Flink FlatMapFunction. 
- */
-@SuppressWarnings("rawtypes")
-@Public
-public final class HadoopMapFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT> 
-					extends RichFlatMapFunction<Tuple2<KEYIN,VALUEIN>, Tuple2<KEYOUT,VALUEOUT>> 
-					implements ResultTypeQueryable<Tuple2<KEYOUT,VALUEOUT>>, Serializable {
-
-	private static final long serialVersionUID = 1L;
-
-	private transient Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT> mapper;
-	private transient JobConf jobConf;
-
-	private transient HadoopOutputCollector<KEYOUT,VALUEOUT> outputCollector;
-	private transient Reporter reporter;
-	
-	/**
-	 * Maps a Hadoop Mapper (mapred API) to a Flink FlatMapFunction.
-	 * 
-	 * @param hadoopMapper The Hadoop Mapper to wrap.
-	 */
-	public HadoopMapFunction(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopMapper) {
-		this(hadoopMapper, new JobConf());
-	}
-	
-	/**
-	 * Maps a Hadoop Mapper (mapred API) to a Flink FlatMapFunction.
-	 * The Hadoop Mapper is configured with the provided JobConf.
-	 * 
-	 * @param hadoopMapper The Hadoop Mapper to wrap.
-	 * @param conf The JobConf that is used to configure the Hadoop Mapper.
-	 */
-	public HadoopMapFunction(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopMapper, JobConf conf) {
-		if(hadoopMapper == null) {
-			throw new NullPointerException("Mapper may not be null.");
-		}
-		if(conf == null) {
-			throw new NullPointerException("JobConf may not be null.");
-		}
-		
-		this.mapper = hadoopMapper;
-		this.jobConf = conf;
-	}
-
-	@Override
-	public void open(Configuration parameters) throws Exception {
-		super.open(parameters);
-		this.mapper.configure(jobConf);
-		
-		this.reporter = new HadoopDummyReporter();
-		this.outputCollector = new HadoopOutputCollector<KEYOUT, VALUEOUT>();
-	}
-
-	@Override
-	public void flatMap(final Tuple2<KEYIN,VALUEIN> value, final Collector<Tuple2<KEYOUT,VALUEOUT>> out) 
-			throws Exception {
-		outputCollector.setFlinkCollector(out);
-		mapper.map(value.f0, value.f1, outputCollector, reporter);
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	public TypeInformation<Tuple2<KEYOUT,VALUEOUT>> getProducedType() {	
-		Class<KEYOUT> outKeyClass = (Class<KEYOUT>) TypeExtractor.getParameterType(Mapper.class, mapper.getClass(), 2);
-		Class<VALUEOUT> outValClass = (Class<VALUEOUT>)TypeExtractor.getParameterType(Mapper.class, mapper.getClass(), 3);
-		
-		final TypeInformation<KEYOUT> keyTypeInfo = TypeExtractor.getForClass((Class<KEYOUT>) outKeyClass);
-		final TypeInformation<VALUEOUT> valueTypleInfo = TypeExtractor.getForClass((Class<VALUEOUT>) outValClass);
-		return new TupleTypeInfo<Tuple2<KEYOUT,VALUEOUT>>(keyTypeInfo, valueTypleInfo);
-	}
-	
-	/**
-	 * Custom serialization methods.
-	 * @see <a href="http://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html">http://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html</a>
-	 */
-	private void writeObject(final ObjectOutputStream out) throws IOException {
-		out.writeObject(mapper.getClass());
-		jobConf.write(out);
-	}
-
-	@SuppressWarnings("unchecked")
-	private void readObject(final ObjectInputStream in) throws IOException, ClassNotFoundException {
-		Class<Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT>> mapperClass = 
-				(Class<Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT>>)in.readObject();
-		mapper = InstantiationUtil.instantiate(mapperClass);
-		
-		jobConf = new JobConf();
-		jobConf.readFields(in);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java b/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java
deleted file mode 100644
index c1acc2b..0000000
--- a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.hadoopcompatibility.mapred;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.Serializable;
-
-import org.apache.flink.annotation.Public;
-import org.apache.flink.api.common.functions.GroupCombineFunction;
-import org.apache.flink.api.common.functions.RichGroupReduceFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter;
-import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopOutputCollector;
-import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopTupleUnwrappingIterator;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.InstantiationUtil;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
-
-/**
- * This wrapper maps a Hadoop Reducer and Combiner (mapred API) to a combinable Flink GroupReduceFunction.
- */
-@SuppressWarnings("rawtypes")
-@Public
-public final class HadoopReduceCombineFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT> 
-	extends RichGroupReduceFunction<Tuple2<KEYIN,VALUEIN>,Tuple2<KEYOUT,VALUEOUT>>
-	implements GroupCombineFunction<Tuple2<KEYIN,VALUEIN>, Tuple2<KEYIN,VALUEIN>>,
-				ResultTypeQueryable<Tuple2<KEYOUT,VALUEOUT>>, Serializable {
-
-	private static final long serialVersionUID = 1L;
-
-	private transient Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> reducer;
-	private transient Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN> combiner;
-	private transient JobConf jobConf;
-	
-	private transient HadoopTupleUnwrappingIterator<KEYIN, VALUEIN> valueIterator;
-	private transient HadoopOutputCollector<KEYOUT,VALUEOUT> reduceCollector;
-	private transient HadoopOutputCollector<KEYIN,VALUEIN> combineCollector;
-	private transient Reporter reporter;
-
-	/**
-	 * Maps two Hadoop Reducer (mapred API) to a combinable Flink GroupReduceFunction.
-	 * 
-	 * @param hadoopReducer The Hadoop Reducer that is mapped to a GroupReduceFunction.
-	 * @param hadoopCombiner The Hadoop Reducer that is mapped to the combiner function.
-	 */
-	public HadoopReduceCombineFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer,
-										Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN> hadoopCombiner) {
-		this(hadoopReducer, hadoopCombiner, new JobConf());
-	}
-	
-	/**
-	 * Maps two Hadoop Reducer (mapred API) to a combinable Flink GroupReduceFunction.
-	 * 
-	 * @param hadoopReducer The Hadoop Reducer that is mapped to a GroupReduceFunction.
-	 * @param hadoopCombiner The Hadoop Reducer that is mapped to the combiner function.
-	 * @param conf The JobConf that is used to configure both Hadoop Reducers.
-	 */
-	public HadoopReduceCombineFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer,
-								Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN> hadoopCombiner, JobConf conf) {
-		if(hadoopReducer == null) {
-			throw new NullPointerException("Reducer may not be null.");
-		}
-		if(hadoopCombiner == null) {
-			throw new NullPointerException("Combiner may not be null.");
-		}
-		if(conf == null) {
-			throw new NullPointerException("JobConf may not be null.");
-		}
-		
-		this.reducer = hadoopReducer;
-		this.combiner = hadoopCombiner;
-		this.jobConf = conf;
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	public void open(Configuration parameters) throws Exception {
-		super.open(parameters);
-		this.reducer.configure(jobConf);
-		this.combiner.configure(jobConf);
-		
-		this.reporter = new HadoopDummyReporter();
-		Class<KEYIN> inKeyClass = (Class<KEYIN>) TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 0);
-		TypeSerializer<KEYIN> keySerializer = TypeExtractor.getForClass(inKeyClass).createSerializer(getRuntimeContext().getExecutionConfig());
-		this.valueIterator = new HadoopTupleUnwrappingIterator<>(keySerializer);
-		this.combineCollector = new HadoopOutputCollector<>();
-		this.reduceCollector = new HadoopOutputCollector<>();
-	}
-
-	@Override
-	public void reduce(final Iterable<Tuple2<KEYIN,VALUEIN>> values, final Collector<Tuple2<KEYOUT,VALUEOUT>> out)
-			throws Exception {
-		reduceCollector.setFlinkCollector(out);
-		valueIterator.set(values.iterator());
-		reducer.reduce(valueIterator.getCurrentKey(), valueIterator, reduceCollector, reporter);
-	}
-
-	@Override
-	public void combine(final Iterable<Tuple2<KEYIN,VALUEIN>> values, final Collector<Tuple2<KEYIN,VALUEIN>> out) throws Exception {
-		combineCollector.setFlinkCollector(out);
-		valueIterator.set(values.iterator());
-		combiner.reduce(valueIterator.getCurrentKey(), valueIterator, combineCollector, reporter);
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	public TypeInformation<Tuple2<KEYOUT,VALUEOUT>> getProducedType() {
-		Class<KEYOUT> outKeyClass = (Class<KEYOUT>) TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 2);
-		Class<VALUEOUT> outValClass = (Class<VALUEOUT>)TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 3);
-
-		final TypeInformation<KEYOUT> keyTypeInfo = TypeExtractor.getForClass(outKeyClass);
-		final TypeInformation<VALUEOUT> valueTypleInfo = TypeExtractor.getForClass(outValClass);
-		return new TupleTypeInfo<>(keyTypeInfo, valueTypleInfo);
-	}
-
-	/**
-	 * Custom serialization methods.
-	 * @see <a href="http://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html">http://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html</a>
-	 */
-	private void writeObject(final ObjectOutputStream out) throws IOException {
-		
-		out.writeObject(reducer.getClass());
-		out.writeObject(combiner.getClass());
-		jobConf.write(out);
-	}
-
-	@SuppressWarnings("unchecked")
-	private void readObject(final ObjectInputStream in) throws IOException, ClassNotFoundException {
-		
-		Class<Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>> reducerClass = 
-				(Class<Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>>)in.readObject();
-		reducer = InstantiationUtil.instantiate(reducerClass);
-		
-		Class<Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN>> combinerClass = 
-				(Class<Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN>>)in.readObject();
-		combiner = InstantiationUtil.instantiate(combinerClass);
-		
-		jobConf = new JobConf();
-		jobConf.readFields(in);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceFunction.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceFunction.java b/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceFunction.java
deleted file mode 100644
index 55aea24..0000000
--- a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceFunction.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.hadoopcompatibility.mapred;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.Serializable;
-
-import org.apache.flink.annotation.Public;
-import org.apache.flink.api.common.functions.RichGroupReduceFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter;
-import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopOutputCollector;
-import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopTupleUnwrappingIterator;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.InstantiationUtil;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
-
-/**
- * This wrapper maps a Hadoop Reducer (mapred API) to a non-combinable Flink GroupReduceFunction. 
- */
-@SuppressWarnings("rawtypes")
-@Public
-public final class HadoopReduceFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT> 
-					extends RichGroupReduceFunction<Tuple2<KEYIN,VALUEIN>,Tuple2<KEYOUT,VALUEOUT>> 
-					implements ResultTypeQueryable<Tuple2<KEYOUT,VALUEOUT>>, Serializable {
-
-	private static final long serialVersionUID = 1L;
-
-	private transient Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> reducer;
-	private transient JobConf jobConf;
-	
-	private transient HadoopTupleUnwrappingIterator<KEYIN, VALUEIN> valueIterator;
-	private transient HadoopOutputCollector<KEYOUT,VALUEOUT> reduceCollector;
-	private transient Reporter reporter;
-	
-	/**
-	 * Maps a Hadoop Reducer (mapred API) to a non-combinable Flink GroupReduceFunction.
- 	 * 
-	 * @param hadoopReducer The Hadoop Reducer to wrap.
-	 */
-	public HadoopReduceFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer) {
-		this(hadoopReducer, new JobConf());
-	}
-	
-	/**
-	 * Maps a Hadoop Reducer (mapred API) to a non-combinable Flink GroupReduceFunction.
- 	 * 
-	 * @param hadoopReducer The Hadoop Reducer to wrap.
-	 * @param conf The JobConf that is used to configure the Hadoop Reducer.
-	 */
-	public HadoopReduceFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer, JobConf conf) {
-		if(hadoopReducer == null) {
-			throw new NullPointerException("Reducer may not be null.");
-		}
-		if(conf == null) {
-			throw new NullPointerException("JobConf may not be null.");
-		}
-		
-		this.reducer = hadoopReducer;
-		this.jobConf = conf;
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	public void open(Configuration parameters) throws Exception {
-		super.open(parameters);
-		this.reducer.configure(jobConf);
-		
-		this.reporter = new HadoopDummyReporter();
-		this.reduceCollector = new HadoopOutputCollector<KEYOUT, VALUEOUT>();
-		Class<KEYIN> inKeyClass = (Class<KEYIN>) TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 0);
-		TypeSerializer<KEYIN> keySerializer = TypeExtractor.getForClass(inKeyClass).createSerializer(getRuntimeContext().getExecutionConfig());
-		this.valueIterator = new HadoopTupleUnwrappingIterator<KEYIN, VALUEIN>(keySerializer);
-	}
-
-	@Override
-	public void reduce(final Iterable<Tuple2<KEYIN,VALUEIN>> values, final Collector<Tuple2<KEYOUT,VALUEOUT>> out)
-			throws Exception {
-		
-		reduceCollector.setFlinkCollector(out);
-		valueIterator.set(values.iterator());
-		reducer.reduce(valueIterator.getCurrentKey(), valueIterator, reduceCollector, reporter);
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	public TypeInformation<Tuple2<KEYOUT,VALUEOUT>> getProducedType() {
-		Class<KEYOUT> outKeyClass = (Class<KEYOUT>) TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 2);
-		Class<VALUEOUT> outValClass = (Class<VALUEOUT>)TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 3);
-
-		final TypeInformation<KEYOUT> keyTypeInfo = TypeExtractor.getForClass((Class<KEYOUT>) outKeyClass);
-		final TypeInformation<VALUEOUT> valueTypleInfo = TypeExtractor.getForClass((Class<VALUEOUT>) outValClass);
-		return new TupleTypeInfo<Tuple2<KEYOUT,VALUEOUT>>(keyTypeInfo, valueTypleInfo);
-	}
-
-	/**
-	 * Custom serialization methods
-	 * @see <a href="http://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html">http://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html</a>
-	 */
-	private void writeObject(final ObjectOutputStream out) throws IOException {
-		
-		out.writeObject(reducer.getClass());
-		jobConf.write(out);		
-	}
-
-	@SuppressWarnings("unchecked")
-	private void readObject(final ObjectInputStream in) throws IOException, ClassNotFoundException {
-		
-		Class<Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>> reducerClass = 
-				(Class<Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>>)in.readObject();
-		reducer = InstantiationUtil.instantiate(reducerClass);
-		
-		jobConf = new JobConf();
-		jobConf.readFields(in);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopOutputCollector.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopOutputCollector.java b/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopOutputCollector.java
deleted file mode 100644
index bfe03d3..0000000
--- a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopOutputCollector.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.hadoopcompatibility.mapred.wrapper;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.util.Collector;
-import org.apache.hadoop.mapred.OutputCollector;
-
-import java.io.IOException;
-
-/**
- * A Hadoop OutputCollector that wraps a Flink OutputCollector.
- * On each call of collect() the data is forwarded to the wrapped Flink collector.
- */
-public final class HadoopOutputCollector<KEY,VALUE> implements OutputCollector<KEY,VALUE> {
-
-	private Collector<Tuple2<KEY,VALUE>> flinkCollector;
-
-	private final Tuple2<KEY,VALUE> outTuple = new Tuple2<KEY, VALUE>();
-
-	/**
-	 * Set the wrapped Flink collector.
-	 * 
-	 * @param flinkCollector The wrapped Flink OutputCollector.
-	 */
-	public void setFlinkCollector(Collector<Tuple2<KEY, VALUE>> flinkCollector) {
-		this.flinkCollector = flinkCollector;
-	}
-	
-	/**
-	 * Use the wrapped Flink collector to collect a key-value pair for Flink. 
-	 * 
-	 * @param key the key to collect
-	 * @param val the value to collect
-	 * @throws IOException unexpected of key or value in key-value pair.
-	 */
-	@Override
-	public void collect(final KEY key, final VALUE val) throws IOException {
-		this.outTuple.f0 = key;
-		this.outTuple.f1 = val;
-		this.flinkCollector.collect(outTuple);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIterator.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIterator.java b/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIterator.java
deleted file mode 100644
index 2d204b8..0000000
--- a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIterator.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.hadoopcompatibility.mapred.wrapper;
-
-import java.util.Iterator;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.operators.translation.TupleUnwrappingIterator;
-import org.apache.flink.api.java.tuple.Tuple2;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * Wraps a Flink Tuple2 (key-value-pair) iterator into an iterator over the second (value) field.
- */
-public class HadoopTupleUnwrappingIterator<KEY,VALUE> 
-		extends TupleUnwrappingIterator<VALUE, KEY> implements java.io.Serializable {
-
-	private static final long serialVersionUID = 1L;
-
-	private final TypeSerializer<KEY> keySerializer;
-
-	private transient Iterator<Tuple2<KEY,VALUE>> iterator;
-	
-	private transient KEY curKey;
-	private transient VALUE firstValue;
-	private transient boolean atFirst;
-
-	public HadoopTupleUnwrappingIterator(TypeSerializer<KEY> keySerializer) {
-		this.keySerializer = checkNotNull(keySerializer);
-	}
-	
-	/**
-	 * Set the Flink iterator to wrap.
-	 * 
-	 * @param iterator The Flink iterator to wrap.
-	 */
-	@Override
-	public void set(final Iterator<Tuple2<KEY,VALUE>> iterator) {
-		this.iterator = iterator;
-		if(this.hasNext()) {
-			final Tuple2<KEY, VALUE> tuple = iterator.next();
-			this.curKey = keySerializer.copy(tuple.f0);
-			this.firstValue = tuple.f1;
-			this.atFirst = true;
-		} else {
-			this.atFirst = false;
-		}
-	}
-	
-	@Override
-	public boolean hasNext() {
-		if(this.atFirst) {
-			return true;
-		}
-		return iterator.hasNext();
-	}
-	
-	@Override
-	public VALUE next() {
-		if(this.atFirst) {
-			this.atFirst = false;
-			return firstValue;
-		}
-		
-		final Tuple2<KEY, VALUE> tuple = iterator.next();
-		return tuple.f1;
-	}
-	
-	public KEY getCurrentKey() {
-		return this.curKey;
-	}
-	
-	@Override
-	public void remove() {
-		throw new UnsupportedOperationException();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hadoop-compatibility/src/main/scala/org/apache/flink/hadoopcompatibility/scala/HadoopInputs.scala
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-hadoop-compatibility/src/main/scala/org/apache/flink/hadoopcompatibility/scala/HadoopInputs.scala b/flink-batch-connectors/flink-hadoop-compatibility/src/main/scala/org/apache/flink/hadoopcompatibility/scala/HadoopInputs.scala
deleted file mode 100644
index 133a5f4..0000000
--- a/flink-batch-connectors/flink-hadoop-compatibility/src/main/scala/org/apache/flink/hadoopcompatibility/scala/HadoopInputs.scala
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.hadoopcompatibility.scala
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.scala.hadoop.mapreduce
-import org.apache.flink.api.scala.hadoop.mapred
-import org.apache.hadoop.fs.{Path => HadoopPath}
-import org.apache.hadoop.mapred.{JobConf, FileInputFormat => MapredFileInputFormat, InputFormat => MapredInputFormat}
-import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => MapreduceFileInputFormat}
-import org.apache.hadoop.mapreduce.{Job, InputFormat => MapreduceInputFormat}
-
-/**
-  * HadoopInputs is a utility class to use Apache Hadoop InputFormats with Apache Flink.
-  *
-  * It provides methods to create Flink InputFormat wrappers for Hadoop
-  * [[org.apache.hadoop.mapred.InputFormat]] and [[org.apache.hadoop.mapreduce.InputFormat]].
-  *
-  * Key value pairs produced by the Hadoop InputFormats are converted into [[Tuple2]] where
-  * the first field is the key and the second field is the value.
-  *
-  */
-object HadoopInputs {
-
-  /**
-    * Creates a Flink [[org.apache.flink.api.common.io.InputFormat]] that wraps the given Hadoop
-    * [[org.apache.hadoop.mapred.FileInputFormat]].
-    */
-  def readHadoopFile[K, V](
-      mapredInputFormat: MapredFileInputFormat[K, V],
-      key: Class[K],
-      value: Class[V],
-      inputPath: String,
-      job: JobConf)(implicit tpe: TypeInformation[(K, V)]): mapred.HadoopInputFormat[K, V] = {
-
-    // set input path in JobConf
-    MapredFileInputFormat.addInputPath(job, new HadoopPath(inputPath))
-    // wrap mapredInputFormat
-    createHadoopInput(mapredInputFormat, key, value, job)
-  }
-
-  /**
-    * Creates a Flink [[org.apache.flink.api.common.io.InputFormat]] that wraps the given Hadoop
-    * [[org.apache.hadoop.mapred.FileInputFormat]].
-    */
-  def readHadoopFile[K, V](
-      mapredInputFormat: MapredFileInputFormat[K, V],
-      key: Class[K],
-      value: Class[V],
-      inputPath: String)(implicit tpe: TypeInformation[(K, V)]): mapred.HadoopInputFormat[K, V] = {
-
-    readHadoopFile(mapredInputFormat, key, value, inputPath, new JobConf)
-  }
-
-  /**
-    * Creates a Flink [[org.apache.flink.api.common.io.InputFormat]] that reads a Hadoop sequence
-    * file with the given key and value classes.
-    */
-  def readSequenceFile[K, V](
-      key: Class[K],
-      value: Class[V],
-      inputPath: String)(implicit tpe: TypeInformation[(K, V)]): mapred.HadoopInputFormat[K, V] = {
-
-    readHadoopFile(
-      new org.apache.hadoop.mapred.SequenceFileInputFormat[K, V],
-      key,
-      value,
-      inputPath
-    )
-  }
-
-  /**
-    * Creates a Flink [[org.apache.flink.api.common.io.InputFormat]] that wraps the given Hadoop
-    * [[org.apache.hadoop.mapred.InputFormat]].
-    */
-  def createHadoopInput[K, V](
-      mapredInputFormat: MapredInputFormat[K, V],
-      key: Class[K],
-      value: Class[V],
-      job: JobConf)(implicit tpe: TypeInformation[(K, V)]): mapred.HadoopInputFormat[K, V] = {
-
-    new mapred.HadoopInputFormat[K, V](mapredInputFormat, key, value, job)
-  }
-
-  /**
-    * Creates a Flink [[org.apache.flink.api.common.io.InputFormat]] that wraps the given Hadoop
-    * [[org.apache.hadoop.mapreduce.lib.input.FileInputFormat]].
-    */
-  def readHadoopFile[K, V](
-      mapreduceInputFormat: MapreduceFileInputFormat[K, V],
-      key: Class[K],
-      value: Class[V],
-      inputPath: String,
-      job: Job)(implicit tpe: TypeInformation[(K, V)]): mapreduce.HadoopInputFormat[K, V] = {
-
-    // set input path in Job
-    MapreduceFileInputFormat.addInputPath(job, new HadoopPath(inputPath))
-    // wrap mapreduceInputFormat
-    createHadoopInput(mapreduceInputFormat, key, value, job)
-  }
-
-  /**
-    * Creates a Flink [[org.apache.flink.api.common.io.InputFormat]] that wraps the given Hadoop
-    * [[org.apache.hadoop.mapreduce.lib.input.FileInputFormat]].
-    */
-  def readHadoopFile[K, V](
-      mapreduceInputFormat: MapreduceFileInputFormat[K, V],
-      key: Class[K],
-      value: Class[V],
-      inputPath: String)(implicit tpe: TypeInformation[(K, V)]): mapreduce.HadoopInputFormat[K, V] =
-  {
-    readHadoopFile(mapreduceInputFormat, key, value, inputPath, Job.getInstance)
-  }
-
-  /**
-    * Creates a Flink [[org.apache.flink.api.common.io.InputFormat]] that wraps the given Hadoop
-    * [[org.apache.hadoop.mapreduce.InputFormat]].
-    */
-  def createHadoopInput[K, V](
-      mapreduceInputFormat: MapreduceInputFormat[K, V],
-      key: Class[K],
-      value: Class[V],
-      job: Job)(implicit tpe: TypeInformation[(K, V)]): mapreduce.HadoopInputFormat[K, V] = {
-
-    new mapreduce.HadoopInputFormat[K, V](mapreduceInputFormat, key, value, job)
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableExtractionTest.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableExtractionTest.java b/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableExtractionTest.java
deleted file mode 100644
index 2aefd9f..0000000
--- a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableExtractionTest.java
+++ /dev/null
@@ -1,206 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils;
-
-import org.apache.flink.api.common.functions.InvalidTypesException;
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparator;
-
-import org.junit.Test;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.List;
-
-import static org.junit.Assert.*;
-
-@SuppressWarnings("serial")
-public class WritableExtractionTest {
-
-	@Test
-	public void testDetectWritable() {
-		// writable interface itself must not be writable
-		assertFalse(TypeExtractor.isHadoopWritable(Writable.class));
-
-		// various forms of extension
-		assertTrue(TypeExtractor.isHadoopWritable(DirectWritable.class));
-		assertTrue(TypeExtractor.isHadoopWritable(ViaInterfaceExtension.class));
-		assertTrue(TypeExtractor.isHadoopWritable(ViaAbstractClassExtension.class));
-
-		// some non-writables
-		assertFalse(TypeExtractor.isHadoopWritable(String.class));
-		assertFalse(TypeExtractor.isHadoopWritable(List.class));
-		assertFalse(TypeExtractor.isHadoopWritable(WritableComparator.class));
-	}
-
-	@Test
-	public void testCreateWritableInfo() {
-		TypeInformation<DirectWritable> info1 =
-				TypeExtractor.createHadoopWritableTypeInfo(DirectWritable.class);
-		assertEquals(DirectWritable.class, info1.getTypeClass());
-
-		TypeInformation<ViaInterfaceExtension> info2 =
-				TypeExtractor.createHadoopWritableTypeInfo(ViaInterfaceExtension.class);
-		assertEquals(ViaInterfaceExtension.class, info2.getTypeClass());
-
-		TypeInformation<ViaAbstractClassExtension> info3 = 
-				TypeExtractor.createHadoopWritableTypeInfo(ViaAbstractClassExtension.class);
-		assertEquals(ViaAbstractClassExtension.class, info3.getTypeClass());
-	}
-
-	@Test
-	public void testValidateTypeInfo() {
-		// validate unrelated type info
-		TypeExtractor.validateIfWritable(BasicTypeInfo.STRING_TYPE_INFO, String.class);
-
-		// validate writable type info correctly
-		TypeExtractor.validateIfWritable(new WritableTypeInfo<>(
-				DirectWritable.class), DirectWritable.class);
-		TypeExtractor.validateIfWritable(new WritableTypeInfo<>(
-				ViaInterfaceExtension.class), ViaInterfaceExtension.class);
-		TypeExtractor.validateIfWritable(new WritableTypeInfo<>(
-				ViaAbstractClassExtension.class), ViaAbstractClassExtension.class);
-
-		// incorrect case: not writable at all
-		try {
-			TypeExtractor.validateIfWritable(new WritableTypeInfo<>(
-					DirectWritable.class), String.class);
-			fail("should have failed with an exception");
-		} catch (InvalidTypesException e) {
-			// expected
-		}
-
-		// incorrect case: wrong writable
-		try {
-			TypeExtractor.validateIfWritable(new WritableTypeInfo<>(
-					ViaInterfaceExtension.class), DirectWritable.class);
-			fail("should have failed with an exception");
-		} catch (InvalidTypesException e) {
-			// expected
-		}
-	}
-
-	@Test
-	public void testExtractFromFunction() {
-		RichMapFunction<DirectWritable, DirectWritable> function = new RichMapFunction<DirectWritable, DirectWritable>() {
-			@Override
-			public DirectWritable map(DirectWritable value) throws Exception {
-				return null;
-			}
-		};
-
-		TypeInformation<DirectWritable> outType = 
-				TypeExtractor.getMapReturnTypes(function, new WritableTypeInfo<>(DirectWritable.class));
-
-		assertTrue(outType instanceof WritableTypeInfo);
-		assertEquals(DirectWritable.class, outType.getTypeClass());
-	}
-
-	@Test
-	public void testExtractAsPartOfPojo() {
-		PojoTypeInfo<PojoWithWritable> pojoInfo = 
-				(PojoTypeInfo<PojoWithWritable>) TypeExtractor.getForClass(PojoWithWritable.class);
-
-		boolean foundWritable = false;
-		for (int i = 0; i < pojoInfo.getArity(); i++) {
-			PojoField field = pojoInfo.getPojoFieldAt(i);
-			String name = field.getField().getName();
-			
-			if (name.equals("hadoopCitizen")) {
-				if (foundWritable) {
-					fail("already seen");
-				}
-				foundWritable = true;
-				assertEquals(new WritableTypeInfo<>(DirectWritable.class), field.getTypeInformation());
-				assertEquals(DirectWritable.class, field.getTypeInformation().getTypeClass());
-				
-			}
-		}
-		
-		assertTrue("missed the writable type", foundWritable);
-	}
-
-	@Test
-	public void testInputValidationError() {
-
-		RichMapFunction<Writable, String> function = new RichMapFunction<Writable, String>() {
-			@Override
-			public String map(Writable value) throws Exception {
-				return null;
-			}
-		};
-
-		@SuppressWarnings("unchecked")
-		TypeInformation<Writable> inType = 
-				(TypeInformation<Writable>) (TypeInformation<?>) new WritableTypeInfo<>(DirectWritable.class);
-		
-		try {
-			TypeExtractor.getMapReturnTypes(function, inType);
-			fail("exception expected");
-		}
-		catch (InvalidTypesException e) {
-			// right
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	//  test type classes
-	// ------------------------------------------------------------------------
-
-	public interface ExtendedWritable extends Writable {}
-
-	public static abstract class AbstractWritable implements Writable {}
-
-	public static class DirectWritable implements Writable {
-
-		@Override
-		public void write(DataOutput dataOutput) throws IOException {}
-
-		@Override
-		public void readFields(DataInput dataInput) throws IOException {}
-	}
-
-	public static class ViaInterfaceExtension implements ExtendedWritable {
-
-		@Override
-		public void write(DataOutput dataOutput) throws IOException {}
-
-		@Override
-		public void readFields(DataInput dataInput) throws IOException {}
-	}
-
-	public static class ViaAbstractClassExtension extends AbstractWritable {
-
-		@Override
-		public void write(DataOutput dataOutput) throws IOException {}
-
-		@Override
-		public void readFields(DataInput dataInput) throws IOException {}
-	}
-
-	public static class PojoWithWritable {
-		public String str;
-		public DirectWritable hadoopCitizen;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableInfoParserTest.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableInfoParserTest.java b/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableInfoParserTest.java
deleted file mode 100644
index 3d2b652..0000000
--- a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableInfoParserTest.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils;
-
-import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.hadoop.io.Writable;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-public class WritableInfoParserTest {
-
-	@Test
-	public void testWritableType() {
-		TypeInformation<?> ti = TypeInfoParser.parse(
-				"Writable<org.apache.flink.api.java.typeutils.WritableInfoParserTest$MyWritable>");
-
-		Assert.assertTrue(ti instanceof WritableTypeInfo<?>);
-		Assert.assertEquals(MyWritable.class, ((WritableTypeInfo<?>) ti).getTypeClass());
-	}
-
-	@Test
-	public void testPojoWithWritableType() {
-		TypeInformation<?> ti = TypeInfoParser.parse(
-				"org.apache.flink.api.java.typeutils.WritableInfoParserTest$MyPojo<"
-				+ "basic=Integer,"
-				+ "tuple=Tuple2<String, Integer>,"
-				+ "hadoopCitizen=Writable<org.apache.flink.api.java.typeutils.WritableInfoParserTest$MyWritable>,"
-				+ "array=String[]"
-				+ ">");
-		Assert.assertTrue(ti instanceof PojoTypeInfo);
-		PojoTypeInfo<?> pti = (PojoTypeInfo<?>) ti;
-		Assert.assertEquals("array", pti.getPojoFieldAt(0).getField().getName());
-		Assert.assertTrue(pti.getPojoFieldAt(0).getTypeInformation() instanceof BasicArrayTypeInfo);
-		Assert.assertEquals("basic", pti.getPojoFieldAt(1).getField().getName());
-		Assert.assertTrue(pti.getPojoFieldAt(1).getTypeInformation() instanceof BasicTypeInfo);
-		Assert.assertEquals("hadoopCitizen", pti.getPojoFieldAt(2).getField().getName());
-		Assert.assertTrue(pti.getPojoFieldAt(2).getTypeInformation() instanceof WritableTypeInfo);
-		Assert.assertEquals("tuple", pti.getPojoFieldAt(3).getField().getName());
-		Assert.assertTrue(pti.getPojoFieldAt(3).getTypeInformation() instanceof TupleTypeInfo);
-	}
-	// ------------------------------------------------------------------------
-	//  Test types
-	// ------------------------------------------------------------------------
-
-	public static class MyWritable implements Writable {
-
-		@Override
-		public void write(DataOutput out) throws IOException {}
-
-		@Override
-		public void readFields(DataInput in) throws IOException {}
-	}
-
-	public static class MyPojo {
-		public Integer basic;
-		public Tuple2<String, Integer> tuple;
-		public MyWritable hadoopCitizen;
-		public String[] array;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableTypeInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableTypeInfoTest.java b/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableTypeInfoTest.java
deleted file mode 100644
index eb9cdf0..0000000
--- a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableTypeInfoTest.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils;
-
-import org.apache.flink.util.TestLogger;
-import org.apache.hadoop.io.Writable;
-import org.junit.Test;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-
-public class WritableTypeInfoTest extends TestLogger {
-	
-	@Test
-	public void testWritableTypeInfoEquality() {
-		WritableTypeInfo<TestClass> tpeInfo1 = new WritableTypeInfo<>(TestClass.class);
-		WritableTypeInfo<TestClass> tpeInfo2 = new WritableTypeInfo<>(TestClass.class);
-
-		assertEquals(tpeInfo1, tpeInfo2);
-		assertEquals(tpeInfo1.hashCode(), tpeInfo2.hashCode());
-	}
-
-	@Test
-	public void testWritableTypeInfoInequality() {
-		WritableTypeInfo<TestClass> tpeInfo1 = new WritableTypeInfo<>(TestClass.class);
-		WritableTypeInfo<AlternateClass> tpeInfo2 = new WritableTypeInfo<>(AlternateClass.class);
-
-		assertNotEquals(tpeInfo1, tpeInfo2);
-	}
-
-	// ------------------------------------------------------------------------
-	//  test types
-	// ------------------------------------------------------------------------
-
-	public static class TestClass implements Writable {
-
-		@Override
-		public void write(DataOutput dataOutput) throws IOException {}
-
-		@Override
-		public void readFields(DataInput dataInput) throws IOException {}
-	}
-
-	public static class AlternateClass implements Writable {
-
-		@Override
-		public void write(DataOutput dataOutput) throws IOException {}
-
-		@Override
-		public void readFields(DataInput dataInput) throws IOException {}
-	}
-}