You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/02 13:35:12 UTC

[22/51] [abbrv] [partial] flink git commit: [FLINK-4676] [connectors] Merge batch and streaming connectors into common Maven module.

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableComparator.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableComparator.java b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableComparator.java
new file mode 100644
index 0000000..3a95d94
--- /dev/null
+++ b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableComparator.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import com.esotericsoftware.kryo.Kryo;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.types.NormalizableKey;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.hadoop.io.Writable;
+import org.objenesis.strategy.StdInstantiatorStrategy;
+
+import java.io.IOException;
+
+public class WritableComparator<T extends Writable & Comparable<T>> extends TypeComparator<T> {
+	
+	private static final long serialVersionUID = 1L;
+	
+	private Class<T> type;
+	
+	private final boolean ascendingComparison;
+	
+	private transient T reference;
+	
+	private transient T tempReference;
+	
+	private transient Kryo kryo;
+
+	@SuppressWarnings("rawtypes")
+	private final TypeComparator[] comparators = new TypeComparator[] {this};
+
+	public WritableComparator(boolean ascending, Class<T> type) {
+		this.type = type;
+		this.ascendingComparison = ascending;
+	}
+	
+	@Override
+	public int hash(T record) {
+		return record.hashCode();
+	}
+	
+	@Override
+	public void setReference(T toCompare) {
+		checkKryoInitialized();
+
+		reference = KryoUtils.copy(toCompare, kryo, new WritableSerializer<T>(type));
+	}
+	
+	@Override
+	public boolean equalToReference(T candidate) {
+		return candidate.equals(reference);
+	}
+	
+	@Override
+	public int compareToReference(TypeComparator<T> referencedComparator) {
+		T otherRef = ((WritableComparator<T>) referencedComparator).reference;
+		int comp = otherRef.compareTo(reference);
+		return ascendingComparison ? comp : -comp;
+	}
+	
+	@Override
+	public int compare(T first, T second) {
+		int comp = first.compareTo(second);
+		return ascendingComparison ? comp : -comp;
+	}
+	
+	@Override
+	public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
+		ensureReferenceInstantiated();
+		ensureTempReferenceInstantiated();
+		
+		reference.readFields(firstSource);
+		tempReference.readFields(secondSource);
+		
+		int comp = reference.compareTo(tempReference);
+		return ascendingComparison ? comp : -comp;
+	}
+	
+	@Override
+	public boolean supportsNormalizedKey() {
+		return NormalizableKey.class.isAssignableFrom(type);
+	}
+	
+	@Override
+	public int getNormalizeKeyLen() {
+		ensureReferenceInstantiated();
+		
+		NormalizableKey<?> key = (NormalizableKey<?>) reference;
+		return key.getMaxNormalizedKeyLen();
+	}
+	
+	@Override
+	public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
+		return keyBytes < getNormalizeKeyLen();
+	}
+	
+	@Override
+	public void putNormalizedKey(T record, MemorySegment target, int offset, int numBytes) {
+		NormalizableKey<?> key = (NormalizableKey<?>) record;
+		key.copyNormalizedKey(target, offset, numBytes);
+	}
+	
+	@Override
+	public boolean invertNormalizedKey() {
+		return !ascendingComparison;
+	}
+	
+	@Override
+	public TypeComparator<T> duplicate() {
+		return new WritableComparator<T>(ascendingComparison, type);
+	}
+
+	@Override
+	public int extractKeys(Object record, Object[] target, int index) {
+		target[index] = record;
+		return 1;
+	}
+
+	@SuppressWarnings("rawtypes")
+	@Override
+	public TypeComparator[] getFlatComparators() {
+		return comparators;
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	// unsupported normalization
+	// --------------------------------------------------------------------------------------------
+	
+	@Override
+	public boolean supportsSerializationWithKeyNormalization() {
+		return false;
+	}
+	
+	@Override
+	public void writeWithKeyNormalization(T record, DataOutputView target) throws IOException {
+		throw new UnsupportedOperationException();
+	}
+	
+	@Override
+	public T readWithKeyDenormalization(T reuse, DataInputView source) throws IOException {
+		throw new UnsupportedOperationException();
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	private void checkKryoInitialized() {
+		if (this.kryo == null) {
+			this.kryo = new Kryo();
+
+			Kryo.DefaultInstantiatorStrategy instantiatorStrategy = new Kryo.DefaultInstantiatorStrategy();
+			instantiatorStrategy.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy());
+			kryo.setInstantiatorStrategy(instantiatorStrategy);
+
+			this.kryo.setAsmEnabled(true);
+			this.kryo.register(type);
+		}
+	}
+	
+	private void ensureReferenceInstantiated() {
+		if (reference == null) {
+			reference = InstantiationUtil.instantiate(type, Writable.class);
+		}
+	}
+	
+	private void ensureTempReferenceInstantiated() {
+		if (tempReference == null) {
+			tempReference = InstantiationUtil.instantiate(type, Writable.class);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
new file mode 100644
index 0000000..9036d75
--- /dev/null
+++ b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+
+import com.esotericsoftware.kryo.Kryo;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.objenesis.strategy.StdInstantiatorStrategy;
+
+import java.io.IOException;
+
+public class WritableSerializer<T extends Writable> extends TypeSerializer<T> {
+	
+	private static final long serialVersionUID = 1L;
+	
+	private final Class<T> typeClass;
+	
+	private transient Kryo kryo;
+	
+	private transient T copyInstance;
+	
+	public WritableSerializer(Class<T> typeClass) {
+		this.typeClass = typeClass;
+	}
+	
+	@SuppressWarnings("unchecked")
+	@Override
+	public T createInstance() {
+		if(typeClass == NullWritable.class) {
+			return (T) NullWritable.get();
+		}
+		return InstantiationUtil.instantiate(typeClass);
+	}
+
+
+	
+	@Override
+	public T copy(T from) {
+		checkKryoInitialized();
+
+		return KryoUtils.copy(from, kryo, this);
+	}
+	
+	@Override
+	public T copy(T from, T reuse) {
+		checkKryoInitialized();
+
+		return KryoUtils.copy(from, reuse, kryo, this);
+	}
+	
+	@Override
+	public int getLength() {
+		return -1;
+	}
+	
+	@Override
+	public void serialize(T record, DataOutputView target) throws IOException {
+		record.write(target);
+	}
+	
+	@Override
+	public T deserialize(DataInputView source) throws IOException {
+		return deserialize(createInstance(), source);
+	}
+	
+	@Override
+	public T deserialize(T reuse, DataInputView source) throws IOException {
+		reuse.readFields(source);
+		return reuse;
+	}
+	
+	@Override
+	public void copy(DataInputView source, DataOutputView target) throws IOException {
+		ensureInstanceInstantiated();
+		copyInstance.readFields(source);
+		copyInstance.write(target);
+	}
+	
+	@Override
+	public boolean isImmutableType() {
+		return false;
+	}
+	
+	@Override
+	public WritableSerializer<T> duplicate() {
+		return new WritableSerializer<T>(typeClass);
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	private void ensureInstanceInstantiated() {
+		if (copyInstance == null) {
+			copyInstance = createInstance();
+		}
+	}
+	
+	private void checkKryoInitialized() {
+		if (this.kryo == null) {
+			this.kryo = new Kryo();
+
+			Kryo.DefaultInstantiatorStrategy instantiatorStrategy = new Kryo.DefaultInstantiatorStrategy();
+			instantiatorStrategy.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy());
+			kryo.setInstantiatorStrategy(instantiatorStrategy);
+
+			this.kryo.setAsmEnabled(true);
+			this.kryo.register(typeClass);
+		}
+	}
+	// --------------------------------------------------------------------------------------------
+	
+	@Override
+	public int hashCode() {
+		return this.typeClass.hashCode();
+	}
+	
+	@Override
+	public boolean equals(Object obj) {
+		if (obj instanceof WritableSerializer) {
+			WritableSerializer<?> other = (WritableSerializer<?>) obj;
+
+			return other.canEqual(this) && typeClass == other.typeClass;
+		} else {
+			return false;
+		}
+	}
+
+	@Override
+	public boolean canEqual(Object obj) {
+		return obj instanceof WritableSerializer;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopInputs.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopInputs.java b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopInputs.java
new file mode 100644
index 0000000..9e8a3e4
--- /dev/null
+++ b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopInputs.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.hadoopcompatibility;
+
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
+
+import java.io.IOException;
+
+/**
+ * HadoopInputs is a utility class to use Apache Hadoop InputFormats with Apache Flink.
+ *
+ * It provides methods to create Flink InputFormat wrappers for Hadoop {@link org.apache.hadoop.mapred.InputFormat}
+ * and {@link org.apache.hadoop.mapreduce.InputFormat}.
+ *
+ * Key value pairs produced by the Hadoop InputFormats are converted into Flink
+ * {@link org.apache.flink.api.java.tuple.Tuple2 Tuple2} objects where the first field
+ * ({@link org.apache.flink.api.java.tuple.Tuple2#f0 Tuple2.f0}) is the key and the second field
+ * ({@link org.apache.flink.api.java.tuple.Tuple2#f1 Tuple2.f1}) is the value.
+ *
+ */
+
+public final class HadoopInputs {
+	// ----------------------------------- Hadoop Input Format ---------------------------------------
+
+	/**
+	 * Creates a Flink {@link InputFormat} that wraps the given Hadoop {@link org.apache.hadoop.mapred.FileInputFormat}.
+	 *
+	 * @return A Flink InputFormat that wraps the Hadoop FileInputFormat.
+	 */
+	public static <K,V> HadoopInputFormat<K, V> readHadoopFile(org.apache.hadoop.mapred.FileInputFormat<K,V> mapredInputFormat, Class<K> key, Class<V> value, String inputPath, JobConf job) {
+		// set input path in JobConf
+		org.apache.hadoop.mapred.FileInputFormat.addInputPath(job, new org.apache.hadoop.fs.Path(inputPath));
+		// return wrapping InputFormat
+		return createHadoopInput(mapredInputFormat, key, value, job);
+	}
+
+	/**
+	 * Creates a Flink {@link InputFormat} that wraps the given Hadoop {@link org.apache.hadoop.mapred.FileInputFormat}.
+	 *
+	 * @return A Flink InputFormat that wraps the Hadoop FileInputFormat.
+	 */
+	public static <K,V> HadoopInputFormat<K, V> readHadoopFile(org.apache.hadoop.mapred.FileInputFormat<K,V> mapredInputFormat, Class<K> key, Class<V> value, String inputPath) {
+		return readHadoopFile(mapredInputFormat, key, value, inputPath, new JobConf());
+	}
+
+	/**
+	 * Creates a Flink {@link InputFormat} to read a Hadoop sequence file for the given key and value classes.
+	 *
+	 * @return A Flink InputFormat that wraps a Hadoop SequenceFileInputFormat.
+	 */
+	public static <K,V> HadoopInputFormat<K, V> readSequenceFile(Class<K> key, Class<V> value, String inputPath) throws IOException {
+		return readHadoopFile(new org.apache.hadoop.mapred.SequenceFileInputFormat<K, V>(), key, value, inputPath);
+	}
+
+	/**
+	 * Creates a Flink {@link InputFormat} that wraps the given Hadoop {@link org.apache.hadoop.mapred.InputFormat}.
+	 *
+	 * @return A Flink InputFormat that wraps the Hadoop InputFormat.
+	 */
+	public static <K,V> HadoopInputFormat<K, V> createHadoopInput(org.apache.hadoop.mapred.InputFormat<K,V> mapredInputFormat, Class<K> key, Class<V> value, JobConf job) {
+		return new HadoopInputFormat<>(mapredInputFormat, key, value, job);
+	}
+
+	/**
+	 * Creates a Flink {@link InputFormat} that wraps the given Hadoop {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat}.
+	 *
+	 * @return A Flink InputFormat that wraps the Hadoop FileInputFormat.
+	 */
+	public static <K,V> org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> readHadoopFile(
+			org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K,V> mapreduceInputFormat, Class<K> key, Class<V> value, String inputPath, Job job) throws IOException
+	{
+		// set input path in Job
+		org.apache.hadoop.mapreduce.lib.input.FileInputFormat.addInputPath(job, new org.apache.hadoop.fs.Path(inputPath));
+		// return wrapping InputFormat
+		return createHadoopInput(mapreduceInputFormat, key, value, job);
+	}
+
+	/**
+	 * Creates a Flink {@link InputFormat} that wraps the given Hadoop {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat}.
+	 *
+	 * @return A Flink InputFormat that wraps the Hadoop FileInputFormat.
+	 */
+	public static <K,V> org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> readHadoopFile(
+			org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K,V> mapreduceInputFormat, Class<K> key, Class<V> value, String inputPath) throws IOException
+	{
+		return readHadoopFile(mapreduceInputFormat, key, value, inputPath, Job.getInstance());
+	}
+
+	/**
+	 * Creates a Flink {@link InputFormat} that wraps the given Hadoop {@link org.apache.hadoop.mapreduce.InputFormat}.
+	 *
+	 * @return A Flink InputFormat that wraps the Hadoop InputFormat.
+	 */
+	public static <K,V> org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> createHadoopInput(
+			org.apache.hadoop.mapreduce.InputFormat<K,V> mapreduceInputFormat, Class<K> key, Class<V> value, Job job)
+	{
+		return new org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<>(mapreduceInputFormat, key, value, job);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopUtils.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopUtils.java b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopUtils.java
new file mode 100644
index 0000000..97ca329
--- /dev/null
+++ b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopUtils.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.hadoopcompatibility;
+
+import org.apache.commons.cli.Option;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.hadoop.util.GenericOptionsParser;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Utility class to work with Apache Hadoop libraries.
+ */
+public class HadoopUtils {
+	/**
+	 * Returns {@link ParameterTool} for the arguments parsed by {@link GenericOptionsParser}
+	 *
+	 * @param args Input array arguments. It should be parsable by {@link GenericOptionsParser}
+	 * @return A {@link ParameterTool}
+	 * @throws IOException If arguments cannot be parsed by {@link GenericOptionsParser}
+	 * @see GenericOptionsParser
+	 */
+	public static ParameterTool paramsFromGenericOptionsParser(String[] args) throws IOException {
+		Option[] options = new GenericOptionsParser(args).getCommandLine().getOptions();
+		Map<String, String> map = new HashMap<String, String>();
+		for (Option option : options) {
+			String[] split = option.getValue().split("=");
+			map.put(split[0], split[1]);
+		}
+		return ParameterTool.fromMap(map);
+	}
+}
+
+

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopMapFunction.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopMapFunction.java b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopMapFunction.java
new file mode 100644
index 0000000..ba8aa90
--- /dev/null
+++ b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopMapFunction.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.hadoopcompatibility.mapred;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+import org.apache.flink.annotation.Public;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter;
+import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopOutputCollector;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.Reporter;
+
+/**
+ * This wrapper maps a Hadoop Mapper (mapred API) to a Flink FlatMapFunction. 
+ */
+@SuppressWarnings("rawtypes")
+@Public
+public final class HadoopMapFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT> 
+					extends RichFlatMapFunction<Tuple2<KEYIN,VALUEIN>, Tuple2<KEYOUT,VALUEOUT>> 
+					implements ResultTypeQueryable<Tuple2<KEYOUT,VALUEOUT>>, Serializable {
+
+	private static final long serialVersionUID = 1L;
+
+	private transient Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT> mapper;
+	private transient JobConf jobConf;
+
+	private transient HadoopOutputCollector<KEYOUT,VALUEOUT> outputCollector;
+	private transient Reporter reporter;
+	
+	/**
+	 * Maps a Hadoop Mapper (mapred API) to a Flink FlatMapFunction.
+	 * 
+	 * @param hadoopMapper The Hadoop Mapper to wrap.
+	 */
+	public HadoopMapFunction(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopMapper) {
+		this(hadoopMapper, new JobConf());
+	}
+	
+	/**
+	 * Maps a Hadoop Mapper (mapred API) to a Flink FlatMapFunction.
+	 * The Hadoop Mapper is configured with the provided JobConf.
+	 * 
+	 * @param hadoopMapper The Hadoop Mapper to wrap.
+	 * @param conf The JobConf that is used to configure the Hadoop Mapper.
+	 */
+	public HadoopMapFunction(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopMapper, JobConf conf) {
+		if(hadoopMapper == null) {
+			throw new NullPointerException("Mapper may not be null.");
+		}
+		if(conf == null) {
+			throw new NullPointerException("JobConf may not be null.");
+		}
+		
+		this.mapper = hadoopMapper;
+		this.jobConf = conf;
+	}
+
+	@Override
+	public void open(Configuration parameters) throws Exception {
+		super.open(parameters);
+		this.mapper.configure(jobConf);
+		
+		this.reporter = new HadoopDummyReporter();
+		this.outputCollector = new HadoopOutputCollector<KEYOUT, VALUEOUT>();
+	}
+
+	@Override
+	public void flatMap(final Tuple2<KEYIN,VALUEIN> value, final Collector<Tuple2<KEYOUT,VALUEOUT>> out) 
+			throws Exception {
+		outputCollector.setFlinkCollector(out);
+		mapper.map(value.f0, value.f1, outputCollector, reporter);
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public TypeInformation<Tuple2<KEYOUT,VALUEOUT>> getProducedType() {	
+		Class<KEYOUT> outKeyClass = (Class<KEYOUT>) TypeExtractor.getParameterType(Mapper.class, mapper.getClass(), 2);
+		Class<VALUEOUT> outValClass = (Class<VALUEOUT>)TypeExtractor.getParameterType(Mapper.class, mapper.getClass(), 3);
+		
+		final TypeInformation<KEYOUT> keyTypeInfo = TypeExtractor.getForClass((Class<KEYOUT>) outKeyClass);
+		final TypeInformation<VALUEOUT> valueTypleInfo = TypeExtractor.getForClass((Class<VALUEOUT>) outValClass);
+		return new TupleTypeInfo<Tuple2<KEYOUT,VALUEOUT>>(keyTypeInfo, valueTypleInfo);
+	}
+	
+	/**
+	 * Custom serialization methods.
+	 * @see <a href="http://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html">http://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html</a>
+	 */
+	private void writeObject(final ObjectOutputStream out) throws IOException {
+		out.writeObject(mapper.getClass());
+		jobConf.write(out);
+	}
+
+	@SuppressWarnings("unchecked")
+	private void readObject(final ObjectInputStream in) throws IOException, ClassNotFoundException {
+		Class<Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT>> mapperClass = 
+				(Class<Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT>>)in.readObject();
+		mapper = InstantiationUtil.instantiate(mapperClass);
+		
+		jobConf = new JobConf();
+		jobConf.readFields(in);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java
new file mode 100644
index 0000000..c1acc2b
--- /dev/null
+++ b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.hadoopcompatibility.mapred;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+import org.apache.flink.annotation.Public;
+import org.apache.flink.api.common.functions.GroupCombineFunction;
+import org.apache.flink.api.common.functions.RichGroupReduceFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter;
+import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopOutputCollector;
+import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopTupleUnwrappingIterator;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+
+/**
+ * This wrapper maps a Hadoop Reducer and Combiner (mapred API) to a combinable Flink GroupReduceFunction.
+ */
+@SuppressWarnings("rawtypes")
+@Public
+public final class HadoopReduceCombineFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT> 
+	extends RichGroupReduceFunction<Tuple2<KEYIN,VALUEIN>,Tuple2<KEYOUT,VALUEOUT>>
+	implements GroupCombineFunction<Tuple2<KEYIN,VALUEIN>, Tuple2<KEYIN,VALUEIN>>,
+				ResultTypeQueryable<Tuple2<KEYOUT,VALUEOUT>>, Serializable {
+
+	private static final long serialVersionUID = 1L;
+
+	private transient Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> reducer;
+	private transient Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN> combiner;
+	private transient JobConf jobConf;
+	
+	private transient HadoopTupleUnwrappingIterator<KEYIN, VALUEIN> valueIterator;
+	private transient HadoopOutputCollector<KEYOUT,VALUEOUT> reduceCollector;
+	private transient HadoopOutputCollector<KEYIN,VALUEIN> combineCollector;
+	private transient Reporter reporter;
+
+	/**
+	 * Maps two Hadoop Reducer (mapred API) to a combinable Flink GroupReduceFunction.
+	 * 
+	 * @param hadoopReducer The Hadoop Reducer that is mapped to a GroupReduceFunction.
+	 * @param hadoopCombiner The Hadoop Reducer that is mapped to the combiner function.
+	 */
+	public HadoopReduceCombineFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer,
+										Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN> hadoopCombiner) {
+		this(hadoopReducer, hadoopCombiner, new JobConf());
+	}
+	
+	/**
+	 * Maps two Hadoop Reducer (mapred API) to a combinable Flink GroupReduceFunction.
+	 * 
+	 * @param hadoopReducer The Hadoop Reducer that is mapped to a GroupReduceFunction.
+	 * @param hadoopCombiner The Hadoop Reducer that is mapped to the combiner function.
+	 * @param conf The JobConf that is used to configure both Hadoop Reducers.
+	 */
+	public HadoopReduceCombineFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer,
+								Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN> hadoopCombiner, JobConf conf) {
+		if(hadoopReducer == null) {
+			throw new NullPointerException("Reducer may not be null.");
+		}
+		if(hadoopCombiner == null) {
+			throw new NullPointerException("Combiner may not be null.");
+		}
+		if(conf == null) {
+			throw new NullPointerException("JobConf may not be null.");
+		}
+		
+		this.reducer = hadoopReducer;
+		this.combiner = hadoopCombiner;
+		this.jobConf = conf;
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public void open(Configuration parameters) throws Exception {
+		super.open(parameters);
+		this.reducer.configure(jobConf);
+		this.combiner.configure(jobConf);
+		
+		this.reporter = new HadoopDummyReporter();
+		Class<KEYIN> inKeyClass = (Class<KEYIN>) TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 0);
+		TypeSerializer<KEYIN> keySerializer = TypeExtractor.getForClass(inKeyClass).createSerializer(getRuntimeContext().getExecutionConfig());
+		this.valueIterator = new HadoopTupleUnwrappingIterator<>(keySerializer);
+		this.combineCollector = new HadoopOutputCollector<>();
+		this.reduceCollector = new HadoopOutputCollector<>();
+	}
+
+	@Override
+	public void reduce(final Iterable<Tuple2<KEYIN,VALUEIN>> values, final Collector<Tuple2<KEYOUT,VALUEOUT>> out)
+			throws Exception {
+		reduceCollector.setFlinkCollector(out);
+		valueIterator.set(values.iterator());
+		reducer.reduce(valueIterator.getCurrentKey(), valueIterator, reduceCollector, reporter);
+	}
+
+	@Override
+	public void combine(final Iterable<Tuple2<KEYIN,VALUEIN>> values, final Collector<Tuple2<KEYIN,VALUEIN>> out) throws Exception {
+		combineCollector.setFlinkCollector(out);
+		valueIterator.set(values.iterator());
+		combiner.reduce(valueIterator.getCurrentKey(), valueIterator, combineCollector, reporter);
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public TypeInformation<Tuple2<KEYOUT,VALUEOUT>> getProducedType() {
+		Class<KEYOUT> outKeyClass = (Class<KEYOUT>) TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 2);
+		Class<VALUEOUT> outValClass = (Class<VALUEOUT>)TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 3);
+
+		final TypeInformation<KEYOUT> keyTypeInfo = TypeExtractor.getForClass(outKeyClass);
+		final TypeInformation<VALUEOUT> valueTypleInfo = TypeExtractor.getForClass(outValClass);
+		return new TupleTypeInfo<>(keyTypeInfo, valueTypleInfo);
+	}
+
+	/**
+	 * Custom serialization methods.
+	 * @see <a href="http://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html">http://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html</a>
+	 */
+	private void writeObject(final ObjectOutputStream out) throws IOException {
+		
+		out.writeObject(reducer.getClass());
+		out.writeObject(combiner.getClass());
+		jobConf.write(out);
+	}
+
+	@SuppressWarnings("unchecked")
+	private void readObject(final ObjectInputStream in) throws IOException, ClassNotFoundException {
+		
+		Class<Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>> reducerClass = 
+				(Class<Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>>)in.readObject();
+		reducer = InstantiationUtil.instantiate(reducerClass);
+		
+		Class<Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN>> combinerClass = 
+				(Class<Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN>>)in.readObject();
+		combiner = InstantiationUtil.instantiate(combinerClass);
+		
+		jobConf = new JobConf();
+		jobConf.readFields(in);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceFunction.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceFunction.java b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceFunction.java
new file mode 100644
index 0000000..55aea24
--- /dev/null
+++ b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceFunction.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.hadoopcompatibility.mapred;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+import org.apache.flink.annotation.Public;
+import org.apache.flink.api.common.functions.RichGroupReduceFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter;
+import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopOutputCollector;
+import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopTupleUnwrappingIterator;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+
+/**
+ * This wrapper maps a Hadoop Reducer (mapred API) to a non-combinable Flink GroupReduceFunction. 
+ */
+@SuppressWarnings("rawtypes")
+@Public
+public final class HadoopReduceFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT> 
+					extends RichGroupReduceFunction<Tuple2<KEYIN,VALUEIN>,Tuple2<KEYOUT,VALUEOUT>> 
+					implements ResultTypeQueryable<Tuple2<KEYOUT,VALUEOUT>>, Serializable {
+
+	private static final long serialVersionUID = 1L;
+
+	private transient Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> reducer;
+	private transient JobConf jobConf;
+	
+	private transient HadoopTupleUnwrappingIterator<KEYIN, VALUEIN> valueIterator;
+	private transient HadoopOutputCollector<KEYOUT,VALUEOUT> reduceCollector;
+	private transient Reporter reporter;
+	
+	/**
+	 * Maps a Hadoop Reducer (mapred API) to a non-combinable Flink GroupReduceFunction.
+ 	 * 
+	 * @param hadoopReducer The Hadoop Reducer to wrap.
+	 */
+	public HadoopReduceFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer) {
+		this(hadoopReducer, new JobConf());
+	}
+	
+	/**
+	 * Maps a Hadoop Reducer (mapred API) to a non-combinable Flink GroupReduceFunction.
+ 	 * 
+	 * @param hadoopReducer The Hadoop Reducer to wrap.
+	 * @param conf The JobConf that is used to configure the Hadoop Reducer.
+	 */
+	public HadoopReduceFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer, JobConf conf) {
+		if(hadoopReducer == null) {
+			throw new NullPointerException("Reducer may not be null.");
+		}
+		if(conf == null) {
+			throw new NullPointerException("JobConf may not be null.");
+		}
+		
+		this.reducer = hadoopReducer;
+		this.jobConf = conf;
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public void open(Configuration parameters) throws Exception {
+		super.open(parameters);
+		this.reducer.configure(jobConf);
+		
+		this.reporter = new HadoopDummyReporter();
+		this.reduceCollector = new HadoopOutputCollector<KEYOUT, VALUEOUT>();
+		Class<KEYIN> inKeyClass = (Class<KEYIN>) TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 0);
+		TypeSerializer<KEYIN> keySerializer = TypeExtractor.getForClass(inKeyClass).createSerializer(getRuntimeContext().getExecutionConfig());
+		this.valueIterator = new HadoopTupleUnwrappingIterator<KEYIN, VALUEIN>(keySerializer);
+	}
+
+	@Override
+	public void reduce(final Iterable<Tuple2<KEYIN,VALUEIN>> values, final Collector<Tuple2<KEYOUT,VALUEOUT>> out)
+			throws Exception {
+		
+		reduceCollector.setFlinkCollector(out);
+		valueIterator.set(values.iterator());
+		reducer.reduce(valueIterator.getCurrentKey(), valueIterator, reduceCollector, reporter);
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public TypeInformation<Tuple2<KEYOUT,VALUEOUT>> getProducedType() {
+		Class<KEYOUT> outKeyClass = (Class<KEYOUT>) TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 2);
+		Class<VALUEOUT> outValClass = (Class<VALUEOUT>)TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 3);
+
+		final TypeInformation<KEYOUT> keyTypeInfo = TypeExtractor.getForClass((Class<KEYOUT>) outKeyClass);
+		final TypeInformation<VALUEOUT> valueTypleInfo = TypeExtractor.getForClass((Class<VALUEOUT>) outValClass);
+		return new TupleTypeInfo<Tuple2<KEYOUT,VALUEOUT>>(keyTypeInfo, valueTypleInfo);
+	}
+
+	/**
+	 * Custom serialization methods
+	 * @see <a href="http://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html">http://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html</a>
+	 */
+	private void writeObject(final ObjectOutputStream out) throws IOException {
+		
+		out.writeObject(reducer.getClass());
+		jobConf.write(out);		
+	}
+
+	@SuppressWarnings("unchecked")
+	private void readObject(final ObjectInputStream in) throws IOException, ClassNotFoundException {
+		
+		Class<Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>> reducerClass = 
+				(Class<Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>>)in.readObject();
+		reducer = InstantiationUtil.instantiate(reducerClass);
+		
+		jobConf = new JobConf();
+		jobConf.readFields(in);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopOutputCollector.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopOutputCollector.java b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopOutputCollector.java
new file mode 100644
index 0000000..bfe03d3
--- /dev/null
+++ b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopOutputCollector.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.hadoopcompatibility.mapred.wrapper;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.Collector;
+import org.apache.hadoop.mapred.OutputCollector;
+
+import java.io.IOException;
+
+/**
+ * A Hadoop OutputCollector that wraps a Flink OutputCollector.
+ * On each call of collect() the data is forwarded to the wrapped Flink collector.
+ */
+public final class HadoopOutputCollector<KEY,VALUE> implements OutputCollector<KEY,VALUE> {
+
+	private Collector<Tuple2<KEY,VALUE>> flinkCollector;
+
+	private final Tuple2<KEY,VALUE> outTuple = new Tuple2<KEY, VALUE>();
+
+	/**
+	 * Set the wrapped Flink collector.
+	 * 
+	 * @param flinkCollector The wrapped Flink OutputCollector.
+	 */
+	public void setFlinkCollector(Collector<Tuple2<KEY, VALUE>> flinkCollector) {
+		this.flinkCollector = flinkCollector;
+	}
+	
+	/**
+	 * Use the wrapped Flink collector to collect a key-value pair for Flink. 
+	 * 
+	 * @param key the key to collect
+	 * @param val the value to collect
+	 * @throws IOException unexpected of key or value in key-value pair.
+	 */
+	@Override
+	public void collect(final KEY key, final VALUE val) throws IOException {
+		this.outTuple.f0 = key;
+		this.outTuple.f1 = val;
+		this.flinkCollector.collect(outTuple);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIterator.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIterator.java b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIterator.java
new file mode 100644
index 0000000..2d204b8
--- /dev/null
+++ b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIterator.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.hadoopcompatibility.mapred.wrapper;
+
+import java.util.Iterator;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.operators.translation.TupleUnwrappingIterator;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Wraps a Flink Tuple2 (key-value-pair) iterator into an iterator over the second (value) field.
+ */
+public class HadoopTupleUnwrappingIterator<KEY,VALUE> 
+		extends TupleUnwrappingIterator<VALUE, KEY> implements java.io.Serializable {
+
+	private static final long serialVersionUID = 1L;
+
+	private final TypeSerializer<KEY> keySerializer;
+
+	private transient Iterator<Tuple2<KEY,VALUE>> iterator;
+	
+	private transient KEY curKey;
+	private transient VALUE firstValue;
+	private transient boolean atFirst;
+
+	public HadoopTupleUnwrappingIterator(TypeSerializer<KEY> keySerializer) {
+		this.keySerializer = checkNotNull(keySerializer);
+	}
+	
+	/**
+	 * Set the Flink iterator to wrap.
+	 * 
+	 * @param iterator The Flink iterator to wrap.
+	 */
+	@Override
+	public void set(final Iterator<Tuple2<KEY,VALUE>> iterator) {
+		this.iterator = iterator;
+		if(this.hasNext()) {
+			final Tuple2<KEY, VALUE> tuple = iterator.next();
+			this.curKey = keySerializer.copy(tuple.f0);
+			this.firstValue = tuple.f1;
+			this.atFirst = true;
+		} else {
+			this.atFirst = false;
+		}
+	}
+	
+	@Override
+	public boolean hasNext() {
+		if(this.atFirst) {
+			return true;
+		}
+		return iterator.hasNext();
+	}
+	
+	@Override
+	public VALUE next() {
+		if(this.atFirst) {
+			this.atFirst = false;
+			return firstValue;
+		}
+		
+		final Tuple2<KEY, VALUE> tuple = iterator.next();
+		return tuple.f1;
+	}
+	
+	public KEY getCurrentKey() {
+		return this.curKey;
+	}
+	
+	@Override
+	public void remove() {
+		throw new UnsupportedOperationException();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hadoop-compatibility/src/main/scala/org/apache/flink/hadoopcompatibility/scala/HadoopInputs.scala
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/main/scala/org/apache/flink/hadoopcompatibility/scala/HadoopInputs.scala b/flink-connectors/flink-hadoop-compatibility/src/main/scala/org/apache/flink/hadoopcompatibility/scala/HadoopInputs.scala
new file mode 100644
index 0000000..133a5f4
--- /dev/null
+++ b/flink-connectors/flink-hadoop-compatibility/src/main/scala/org/apache/flink/hadoopcompatibility/scala/HadoopInputs.scala
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.hadoopcompatibility.scala
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala.hadoop.mapreduce
+import org.apache.flink.api.scala.hadoop.mapred
+import org.apache.hadoop.fs.{Path => HadoopPath}
+import org.apache.hadoop.mapred.{JobConf, FileInputFormat => MapredFileInputFormat, InputFormat => MapredInputFormat}
+import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => MapreduceFileInputFormat}
+import org.apache.hadoop.mapreduce.{Job, InputFormat => MapreduceInputFormat}
+
+/**
+  * HadoopInputs is a utility class to use Apache Hadoop InputFormats with Apache Flink.
+  *
+  * It provides methods to create Flink InputFormat wrappers for Hadoop
+  * [[org.apache.hadoop.mapred.InputFormat]] and [[org.apache.hadoop.mapreduce.InputFormat]].
+  *
+  * Key value pairs produced by the Hadoop InputFormats are converted into [[Tuple2]] where
+  * the first field is the key and the second field is the value.
+  *
+  */
+object HadoopInputs {
+
+  /**
+    * Creates a Flink [[org.apache.flink.api.common.io.InputFormat]] that wraps the given Hadoop
+    * [[org.apache.hadoop.mapred.FileInputFormat]].
+    */
+  def readHadoopFile[K, V](
+      mapredInputFormat: MapredFileInputFormat[K, V],
+      key: Class[K],
+      value: Class[V],
+      inputPath: String,
+      job: JobConf)(implicit tpe: TypeInformation[(K, V)]): mapred.HadoopInputFormat[K, V] = {
+
+    // set input path in JobConf
+    MapredFileInputFormat.addInputPath(job, new HadoopPath(inputPath))
+    // wrap mapredInputFormat
+    createHadoopInput(mapredInputFormat, key, value, job)
+  }
+
+  /**
+    * Creates a Flink [[org.apache.flink.api.common.io.InputFormat]] that wraps the given Hadoop
+    * [[org.apache.hadoop.mapred.FileInputFormat]].
+    */
+  def readHadoopFile[K, V](
+      mapredInputFormat: MapredFileInputFormat[K, V],
+      key: Class[K],
+      value: Class[V],
+      inputPath: String)(implicit tpe: TypeInformation[(K, V)]): mapred.HadoopInputFormat[K, V] = {
+
+    readHadoopFile(mapredInputFormat, key, value, inputPath, new JobConf)
+  }
+
+  /**
+    * Creates a Flink [[org.apache.flink.api.common.io.InputFormat]] that reads a Hadoop sequence
+    * file with the given key and value classes.
+    */
+  def readSequenceFile[K, V](
+      key: Class[K],
+      value: Class[V],
+      inputPath: String)(implicit tpe: TypeInformation[(K, V)]): mapred.HadoopInputFormat[K, V] = {
+
+    readHadoopFile(
+      new org.apache.hadoop.mapred.SequenceFileInputFormat[K, V],
+      key,
+      value,
+      inputPath
+    )
+  }
+
+  /**
+    * Creates a Flink [[org.apache.flink.api.common.io.InputFormat]] that wraps the given Hadoop
+    * [[org.apache.hadoop.mapred.InputFormat]].
+    */
+  def createHadoopInput[K, V](
+      mapredInputFormat: MapredInputFormat[K, V],
+      key: Class[K],
+      value: Class[V],
+      job: JobConf)(implicit tpe: TypeInformation[(K, V)]): mapred.HadoopInputFormat[K, V] = {
+
+    new mapred.HadoopInputFormat[K, V](mapredInputFormat, key, value, job)
+  }
+
+  /**
+    * Creates a Flink [[org.apache.flink.api.common.io.InputFormat]] that wraps the given Hadoop
+    * [[org.apache.hadoop.mapreduce.lib.input.FileInputFormat]].
+    */
+  def readHadoopFile[K, V](
+      mapreduceInputFormat: MapreduceFileInputFormat[K, V],
+      key: Class[K],
+      value: Class[V],
+      inputPath: String,
+      job: Job)(implicit tpe: TypeInformation[(K, V)]): mapreduce.HadoopInputFormat[K, V] = {
+
+    // set input path in Job
+    MapreduceFileInputFormat.addInputPath(job, new HadoopPath(inputPath))
+    // wrap mapreduceInputFormat
+    createHadoopInput(mapreduceInputFormat, key, value, job)
+  }
+
+  /**
+    * Creates a Flink [[org.apache.flink.api.common.io.InputFormat]] that wraps the given Hadoop
+    * [[org.apache.hadoop.mapreduce.lib.input.FileInputFormat]].
+    */
+  def readHadoopFile[K, V](
+      mapreduceInputFormat: MapreduceFileInputFormat[K, V],
+      key: Class[K],
+      value: Class[V],
+      inputPath: String)(implicit tpe: TypeInformation[(K, V)]): mapreduce.HadoopInputFormat[K, V] =
+  {
+    readHadoopFile(mapreduceInputFormat, key, value, inputPath, Job.getInstance)
+  }
+
+  /**
+    * Creates a Flink [[org.apache.flink.api.common.io.InputFormat]] that wraps the given Hadoop
+    * [[org.apache.hadoop.mapreduce.InputFormat]].
+    */
+  def createHadoopInput[K, V](
+      mapreduceInputFormat: MapreduceInputFormat[K, V],
+      key: Class[K],
+      value: Class[V],
+      job: Job)(implicit tpe: TypeInformation[(K, V)]): mapreduce.HadoopInputFormat[K, V] = {
+
+    new mapreduce.HadoopInputFormat[K, V](mapreduceInputFormat, key, value, job)
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableExtractionTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableExtractionTest.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableExtractionTest.java
new file mode 100644
index 0000000..2aefd9f
--- /dev/null
+++ b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableExtractionTest.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparator;
+
+import org.junit.Test;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+
+import static org.junit.Assert.*;
+
+@SuppressWarnings("serial")
+public class WritableExtractionTest {
+
+	@Test
+	public void testDetectWritable() {
+		// writable interface itself must not be writable
+		assertFalse(TypeExtractor.isHadoopWritable(Writable.class));
+
+		// various forms of extension
+		assertTrue(TypeExtractor.isHadoopWritable(DirectWritable.class));
+		assertTrue(TypeExtractor.isHadoopWritable(ViaInterfaceExtension.class));
+		assertTrue(TypeExtractor.isHadoopWritable(ViaAbstractClassExtension.class));
+
+		// some non-writables
+		assertFalse(TypeExtractor.isHadoopWritable(String.class));
+		assertFalse(TypeExtractor.isHadoopWritable(List.class));
+		assertFalse(TypeExtractor.isHadoopWritable(WritableComparator.class));
+	}
+
+	@Test
+	public void testCreateWritableInfo() {
+		TypeInformation<DirectWritable> info1 =
+				TypeExtractor.createHadoopWritableTypeInfo(DirectWritable.class);
+		assertEquals(DirectWritable.class, info1.getTypeClass());
+
+		TypeInformation<ViaInterfaceExtension> info2 =
+				TypeExtractor.createHadoopWritableTypeInfo(ViaInterfaceExtension.class);
+		assertEquals(ViaInterfaceExtension.class, info2.getTypeClass());
+
+		TypeInformation<ViaAbstractClassExtension> info3 = 
+				TypeExtractor.createHadoopWritableTypeInfo(ViaAbstractClassExtension.class);
+		assertEquals(ViaAbstractClassExtension.class, info3.getTypeClass());
+	}
+
+	@Test
+	public void testValidateTypeInfo() {
+		// validate unrelated type info
+		TypeExtractor.validateIfWritable(BasicTypeInfo.STRING_TYPE_INFO, String.class);
+
+		// validate writable type info correctly
+		TypeExtractor.validateIfWritable(new WritableTypeInfo<>(
+				DirectWritable.class), DirectWritable.class);
+		TypeExtractor.validateIfWritable(new WritableTypeInfo<>(
+				ViaInterfaceExtension.class), ViaInterfaceExtension.class);
+		TypeExtractor.validateIfWritable(new WritableTypeInfo<>(
+				ViaAbstractClassExtension.class), ViaAbstractClassExtension.class);
+
+		// incorrect case: not writable at all
+		try {
+			TypeExtractor.validateIfWritable(new WritableTypeInfo<>(
+					DirectWritable.class), String.class);
+			fail("should have failed with an exception");
+		} catch (InvalidTypesException e) {
+			// expected
+		}
+
+		// incorrect case: wrong writable
+		try {
+			TypeExtractor.validateIfWritable(new WritableTypeInfo<>(
+					ViaInterfaceExtension.class), DirectWritable.class);
+			fail("should have failed with an exception");
+		} catch (InvalidTypesException e) {
+			// expected
+		}
+	}
+
+	@Test
+	public void testExtractFromFunction() {
+		RichMapFunction<DirectWritable, DirectWritable> function = new RichMapFunction<DirectWritable, DirectWritable>() {
+			@Override
+			public DirectWritable map(DirectWritable value) throws Exception {
+				return null;
+			}
+		};
+
+		TypeInformation<DirectWritable> outType = 
+				TypeExtractor.getMapReturnTypes(function, new WritableTypeInfo<>(DirectWritable.class));
+
+		assertTrue(outType instanceof WritableTypeInfo);
+		assertEquals(DirectWritable.class, outType.getTypeClass());
+	}
+
+	@Test
+	public void testExtractAsPartOfPojo() {
+		PojoTypeInfo<PojoWithWritable> pojoInfo = 
+				(PojoTypeInfo<PojoWithWritable>) TypeExtractor.getForClass(PojoWithWritable.class);
+
+		boolean foundWritable = false;
+		for (int i = 0; i < pojoInfo.getArity(); i++) {
+			PojoField field = pojoInfo.getPojoFieldAt(i);
+			String name = field.getField().getName();
+			
+			if (name.equals("hadoopCitizen")) {
+				if (foundWritable) {
+					fail("already seen");
+				}
+				foundWritable = true;
+				assertEquals(new WritableTypeInfo<>(DirectWritable.class), field.getTypeInformation());
+				assertEquals(DirectWritable.class, field.getTypeInformation().getTypeClass());
+				
+			}
+		}
+		
+		assertTrue("missed the writable type", foundWritable);
+	}
+
+	@Test
+	public void testInputValidationError() {
+
+		RichMapFunction<Writable, String> function = new RichMapFunction<Writable, String>() {
+			@Override
+			public String map(Writable value) throws Exception {
+				return null;
+			}
+		};
+
+		@SuppressWarnings("unchecked")
+		TypeInformation<Writable> inType = 
+				(TypeInformation<Writable>) (TypeInformation<?>) new WritableTypeInfo<>(DirectWritable.class);
+		
+		try {
+			TypeExtractor.getMapReturnTypes(function, inType);
+			fail("exception expected");
+		}
+		catch (InvalidTypesException e) {
+			// right
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  test type classes
+	// ------------------------------------------------------------------------
+
+	public interface ExtendedWritable extends Writable {}
+
+	public static abstract class AbstractWritable implements Writable {}
+
+	public static class DirectWritable implements Writable {
+
+		@Override
+		public void write(DataOutput dataOutput) throws IOException {}
+
+		@Override
+		public void readFields(DataInput dataInput) throws IOException {}
+	}
+
+	public static class ViaInterfaceExtension implements ExtendedWritable {
+
+		@Override
+		public void write(DataOutput dataOutput) throws IOException {}
+
+		@Override
+		public void readFields(DataInput dataInput) throws IOException {}
+	}
+
+	public static class ViaAbstractClassExtension extends AbstractWritable {
+
+		@Override
+		public void write(DataOutput dataOutput) throws IOException {}
+
+		@Override
+		public void readFields(DataInput dataInput) throws IOException {}
+	}
+
+	public static class PojoWithWritable {
+		public String str;
+		public DirectWritable hadoopCitizen;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableInfoParserTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableInfoParserTest.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableInfoParserTest.java
new file mode 100644
index 0000000..3d2b652
--- /dev/null
+++ b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableInfoParserTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.hadoop.io.Writable;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+public class WritableInfoParserTest {
+
+	@Test
+	public void testWritableType() {
+		TypeInformation<?> ti = TypeInfoParser.parse(
+				"Writable<org.apache.flink.api.java.typeutils.WritableInfoParserTest$MyWritable>");
+
+		Assert.assertTrue(ti instanceof WritableTypeInfo<?>);
+		Assert.assertEquals(MyWritable.class, ((WritableTypeInfo<?>) ti).getTypeClass());
+	}
+
+	@Test
+	public void testPojoWithWritableType() {
+		TypeInformation<?> ti = TypeInfoParser.parse(
+				"org.apache.flink.api.java.typeutils.WritableInfoParserTest$MyPojo<"
+				+ "basic=Integer,"
+				+ "tuple=Tuple2<String, Integer>,"
+				+ "hadoopCitizen=Writable<org.apache.flink.api.java.typeutils.WritableInfoParserTest$MyWritable>,"
+				+ "array=String[]"
+				+ ">");
+		Assert.assertTrue(ti instanceof PojoTypeInfo);
+		PojoTypeInfo<?> pti = (PojoTypeInfo<?>) ti;
+		Assert.assertEquals("array", pti.getPojoFieldAt(0).getField().getName());
+		Assert.assertTrue(pti.getPojoFieldAt(0).getTypeInformation() instanceof BasicArrayTypeInfo);
+		Assert.assertEquals("basic", pti.getPojoFieldAt(1).getField().getName());
+		Assert.assertTrue(pti.getPojoFieldAt(1).getTypeInformation() instanceof BasicTypeInfo);
+		Assert.assertEquals("hadoopCitizen", pti.getPojoFieldAt(2).getField().getName());
+		Assert.assertTrue(pti.getPojoFieldAt(2).getTypeInformation() instanceof WritableTypeInfo);
+		Assert.assertEquals("tuple", pti.getPojoFieldAt(3).getField().getName());
+		Assert.assertTrue(pti.getPojoFieldAt(3).getTypeInformation() instanceof TupleTypeInfo);
+	}
+	// ------------------------------------------------------------------------
+	//  Test types
+	// ------------------------------------------------------------------------
+
+	public static class MyWritable implements Writable {
+
+		@Override
+		public void write(DataOutput out) throws IOException {}
+
+		@Override
+		public void readFields(DataInput in) throws IOException {}
+	}
+
+	public static class MyPojo {
+		public Integer basic;
+		public Tuple2<String, Integer> tuple;
+		public MyWritable hadoopCitizen;
+		public String[] array;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableTypeInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableTypeInfoTest.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableTypeInfoTest.java
new file mode 100644
index 0000000..eb9cdf0
--- /dev/null
+++ b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableTypeInfoTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.util.TestLogger;
+import org.apache.hadoop.io.Writable;
+import org.junit.Test;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+public class WritableTypeInfoTest extends TestLogger {
+	
+	@Test
+	public void testWritableTypeInfoEquality() {
+		WritableTypeInfo<TestClass> tpeInfo1 = new WritableTypeInfo<>(TestClass.class);
+		WritableTypeInfo<TestClass> tpeInfo2 = new WritableTypeInfo<>(TestClass.class);
+
+		assertEquals(tpeInfo1, tpeInfo2);
+		assertEquals(tpeInfo1.hashCode(), tpeInfo2.hashCode());
+	}
+
+	@Test
+	public void testWritableTypeInfoInequality() {
+		WritableTypeInfo<TestClass> tpeInfo1 = new WritableTypeInfo<>(TestClass.class);
+		WritableTypeInfo<AlternateClass> tpeInfo2 = new WritableTypeInfo<>(AlternateClass.class);
+
+		assertNotEquals(tpeInfo1, tpeInfo2);
+	}
+
+	// ------------------------------------------------------------------------
+	//  test types
+	// ------------------------------------------------------------------------
+
+	public static class TestClass implements Writable {
+
+		@Override
+		public void write(DataOutput dataOutput) throws IOException {}
+
+		@Override
+		public void readFields(DataInput dataInput) throws IOException {}
+	}
+
+	public static class AlternateClass implements Writable {
+
+		@Override
+		public void write(DataOutput dataOutput) throws IOException {}
+
+		@Override
+		public void readFields(DataInput dataInput) throws IOException {}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/StringArrayWritable.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/StringArrayWritable.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/StringArrayWritable.java
new file mode 100644
index 0000000..c32f5da
--- /dev/null
+++ b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/StringArrayWritable.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.hadoop.io.Writable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+public class StringArrayWritable implements Writable, Comparable<StringArrayWritable> {
+	
+	private String[] array = new String[0];
+	
+	public StringArrayWritable() {
+		super();
+	}
+	
+	public StringArrayWritable(String[] array) {
+		this.array = array;
+	}
+	
+	@Override
+	public void write(DataOutput out) throws IOException {
+		out.writeInt(this.array.length);
+		
+		for(String str : this.array) {
+			byte[] b = str.getBytes();
+			out.writeInt(b.length);
+			out.write(b);
+		}
+	}
+	
+	@Override
+	public void readFields(DataInput in) throws IOException {
+		this.array = new String[in.readInt()];
+		
+		for(int i = 0; i < this.array.length; i++) {
+			byte[] b = new byte[in.readInt()];
+			in.readFully(b);
+			this.array[i] = new String(b);
+		}
+	}
+	
+	@Override
+	public int compareTo(StringArrayWritable o) {
+		if(this.array.length != o.array.length) {
+			return this.array.length - o.array.length;
+		}
+		
+		for(int i = 0; i < this.array.length; i++) {
+			int comp = this.array[i].compareTo(o.array[i]);
+			if(comp != 0) {
+				return comp;
+			}
+		}
+		return 0;
+	}
+	
+	@Override
+	public boolean equals(Object obj) {
+		if(!(obj instanceof StringArrayWritable)) {
+			return false;
+		}
+		return this.compareTo((StringArrayWritable) obj) == 0;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorTest.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorTest.java
new file mode 100644
index 0000000..96f844c
--- /dev/null
+++ b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.typeutils.ComparatorTestBase;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+public class WritableComparatorTest extends ComparatorTestBase<StringArrayWritable> {
+	
+	StringArrayWritable[] data = new StringArrayWritable[]{
+			new StringArrayWritable(new String[]{}),
+			new StringArrayWritable(new String[]{""}),
+			new StringArrayWritable(new String[]{"a","a"}),
+			new StringArrayWritable(new String[]{"a","b"}),
+			new StringArrayWritable(new String[]{"c","c"}),
+			new StringArrayWritable(new String[]{"d","f"}),
+			new StringArrayWritable(new String[]{"d","m"}),
+			new StringArrayWritable(new String[]{"z","x"}),
+			new StringArrayWritable(new String[]{"a","a", "a"})
+	};
+	
+	@Override
+	protected TypeComparator<StringArrayWritable> createComparator(boolean ascending) {
+		return new WritableComparator<StringArrayWritable>(ascending, StringArrayWritable.class);
+	}
+	
+	@Override
+	protected TypeSerializer<StringArrayWritable> createSerializer() {
+		return new WritableSerializer<StringArrayWritable>(StringArrayWritable.class);
+	}
+	
+	@Override
+	protected StringArrayWritable[] getSortedTestData() {
+		return data;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorUUIDTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorUUIDTest.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorUUIDTest.java
new file mode 100644
index 0000000..94e759d
--- /dev/null
+++ b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorUUIDTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.typeutils.ComparatorTestBase;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+import java.util.UUID;
+
+public class WritableComparatorUUIDTest extends ComparatorTestBase<WritableID> {
+	@Override
+	protected TypeComparator<WritableID> createComparator(boolean ascending) {
+		return new WritableComparator<>(ascending, WritableID.class);
+	}
+
+	@Override
+	protected TypeSerializer<WritableID> createSerializer() {
+		return new WritableSerializer<>(WritableID.class);
+	}
+
+	@Override
+	protected WritableID[] getSortedTestData() {
+		return new WritableID[] {
+			new WritableID(new UUID(0, 0)),
+			new WritableID(new UUID(1, 0)),
+			new WritableID(new UUID(1, 1))
+		};
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableID.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableID.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableID.java
new file mode 100644
index 0000000..4274cf6
--- /dev/null
+++ b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableID.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.hadoop.io.WritableComparable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.UUID;
+
+public class WritableID implements WritableComparable<WritableID> {
+	private UUID uuid;
+
+	public WritableID() {
+		this.uuid = UUID.randomUUID();
+	}
+
+	public WritableID(UUID uuid) {
+		this.uuid = uuid;
+	}
+
+	@Override
+	public int compareTo(WritableID o) {
+		return this.uuid.compareTo(o.uuid);
+	}
+
+	@Override
+	public void write(DataOutput dataOutput) throws IOException {
+		dataOutput.writeLong(uuid.getMostSignificantBits());
+		dataOutput.writeLong(uuid.getLeastSignificantBits());
+	}
+
+	@Override
+	public void readFields(DataInput dataInput) throws IOException {
+		this.uuid = new UUID(dataInput.readLong(), dataInput.readLong());
+	}
+
+	@Override
+	public String toString() {
+		return uuid.toString();
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+
+		WritableID id = (WritableID) o;
+
+		return !(uuid != null ? !uuid.equals(id.uuid) : id.uuid != null);
+	}
+
+	@Override
+	public int hashCode() {
+		return uuid != null ? uuid.hashCode() : 0;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerTest.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerTest.java
new file mode 100644
index 0000000..bb5f4d4
--- /dev/null
+++ b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.SerializerTestInstance;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.WritableTypeInfo;
+import org.junit.Test;
+
+public class WritableSerializerTest {
+	
+	@Test
+	public void testStringArrayWritable() {
+		StringArrayWritable[] data = new StringArrayWritable[]{
+				new StringArrayWritable(new String[]{}),
+				new StringArrayWritable(new String[]{""}),
+				new StringArrayWritable(new String[]{"a","a"}),
+				new StringArrayWritable(new String[]{"a","b"}),
+				new StringArrayWritable(new String[]{"c","c"}),
+				new StringArrayWritable(new String[]{"d","f"}),
+				new StringArrayWritable(new String[]{"d","m"}),
+				new StringArrayWritable(new String[]{"z","x"}),
+				new StringArrayWritable(new String[]{"a","a", "a"})
+		};
+		
+		WritableTypeInfo<StringArrayWritable> writableTypeInfo = (WritableTypeInfo<StringArrayWritable>) TypeExtractor.getForObject(data[0]);
+		WritableSerializer<StringArrayWritable> writableSerializer = (WritableSerializer<StringArrayWritable>) writableTypeInfo.createSerializer(new ExecutionConfig());
+		
+		SerializerTestInstance<StringArrayWritable> testInstance = new SerializerTestInstance<StringArrayWritable>(writableSerializer,writableTypeInfo.getTypeClass(), -1, data);
+		
+		testInstance.testAll();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerUUIDTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerUUIDTest.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerUUIDTest.java
new file mode 100644
index 0000000..2af7730
--- /dev/null
+++ b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerUUIDTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+import java.util.UUID;
+
+public class WritableSerializerUUIDTest extends SerializerTestBase<WritableID> {
+	@Override
+	protected TypeSerializer<WritableID> createSerializer() {
+		return new WritableSerializer<>(WritableID.class);
+	}
+
+	@Override
+	protected int getLength() {
+		return -1;
+	}
+
+	@Override
+	protected Class<WritableID> getTypeClass() {
+		return WritableID.class;
+	}
+
+	@Override
+	protected WritableID[] getTestData() {
+		return new WritableID[] {
+			new WritableID(new UUID(0, 0)),
+			new WritableID(new UUID(1, 0)),
+			new WritableID(new UUID(1, 1))
+		};
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/hadoopcompatibility/HadoopUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/hadoopcompatibility/HadoopUtilsTest.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/hadoopcompatibility/HadoopUtilsTest.java
new file mode 100644
index 0000000..6f7673b
--- /dev/null
+++ b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/hadoopcompatibility/HadoopUtilsTest.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.hadoopcompatibility;
+
+import org.apache.flink.api.java.utils.AbstractParameterToolTest;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.junit.Test;
+
+import java.io.IOException;
+
+public class HadoopUtilsTest extends AbstractParameterToolTest {
+
+	@Test
+	public void testParamsFromGenericOptionsParser() throws IOException {
+		ParameterTool parameter = HadoopUtils.paramsFromGenericOptionsParser(new String[]{"-D", "input=myInput", "-DexpectedCount=15"});
+		validate(parameter);
+	}
+}