You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/02 13:35:12 UTC
[22/51] [abbrv] [partial] flink git commit: [FLINK-4676] [connectors]
Merge batch and streaming connectors into common Maven module.
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableComparator.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableComparator.java b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableComparator.java
new file mode 100644
index 0000000..3a95d94
--- /dev/null
+++ b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableComparator.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import com.esotericsoftware.kryo.Kryo;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.types.NormalizableKey;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.hadoop.io.Writable;
+import org.objenesis.strategy.StdInstantiatorStrategy;
+
+import java.io.IOException;
+
+public class WritableComparator<T extends Writable & Comparable<T>> extends TypeComparator<T> {
+
+ private static final long serialVersionUID = 1L;
+
+ private Class<T> type;
+
+ private final boolean ascendingComparison;
+
+ private transient T reference;
+
+ private transient T tempReference;
+
+ private transient Kryo kryo;
+
+ @SuppressWarnings("rawtypes")
+ private final TypeComparator[] comparators = new TypeComparator[] {this};
+
+ public WritableComparator(boolean ascending, Class<T> type) {
+ this.type = type;
+ this.ascendingComparison = ascending;
+ }
+
+ @Override
+ public int hash(T record) {
+ return record.hashCode();
+ }
+
+ @Override
+ public void setReference(T toCompare) {
+ checkKryoInitialized();
+
+ reference = KryoUtils.copy(toCompare, kryo, new WritableSerializer<T>(type));
+ }
+
+ @Override
+ public boolean equalToReference(T candidate) {
+ return candidate.equals(reference);
+ }
+
+ @Override
+ public int compareToReference(TypeComparator<T> referencedComparator) {
+ T otherRef = ((WritableComparator<T>) referencedComparator).reference;
+ int comp = otherRef.compareTo(reference);
+ return ascendingComparison ? comp : -comp;
+ }
+
+ @Override
+ public int compare(T first, T second) {
+ int comp = first.compareTo(second);
+ return ascendingComparison ? comp : -comp;
+ }
+
+ @Override
+ public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
+ ensureReferenceInstantiated();
+ ensureTempReferenceInstantiated();
+
+ reference.readFields(firstSource);
+ tempReference.readFields(secondSource);
+
+ int comp = reference.compareTo(tempReference);
+ return ascendingComparison ? comp : -comp;
+ }
+
+ @Override
+ public boolean supportsNormalizedKey() {
+ return NormalizableKey.class.isAssignableFrom(type);
+ }
+
+ @Override
+ public int getNormalizeKeyLen() {
+ ensureReferenceInstantiated();
+
+ NormalizableKey<?> key = (NormalizableKey<?>) reference;
+ return key.getMaxNormalizedKeyLen();
+ }
+
+ @Override
+ public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
+ return keyBytes < getNormalizeKeyLen();
+ }
+
+ @Override
+ public void putNormalizedKey(T record, MemorySegment target, int offset, int numBytes) {
+ NormalizableKey<?> key = (NormalizableKey<?>) record;
+ key.copyNormalizedKey(target, offset, numBytes);
+ }
+
+ @Override
+ public boolean invertNormalizedKey() {
+ return !ascendingComparison;
+ }
+
+ @Override
+ public TypeComparator<T> duplicate() {
+ return new WritableComparator<T>(ascendingComparison, type);
+ }
+
+ @Override
+ public int extractKeys(Object record, Object[] target, int index) {
+ target[index] = record;
+ return 1;
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public TypeComparator[] getFlatComparators() {
+ return comparators;
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // unsupported normalization
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public boolean supportsSerializationWithKeyNormalization() {
+ return false;
+ }
+
+ @Override
+ public void writeWithKeyNormalization(T record, DataOutputView target) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public T readWithKeyDenormalization(T reuse, DataInputView source) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ private void checkKryoInitialized() {
+ if (this.kryo == null) {
+ this.kryo = new Kryo();
+
+ Kryo.DefaultInstantiatorStrategy instantiatorStrategy = new Kryo.DefaultInstantiatorStrategy();
+ instantiatorStrategy.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy());
+ kryo.setInstantiatorStrategy(instantiatorStrategy);
+
+ this.kryo.setAsmEnabled(true);
+ this.kryo.register(type);
+ }
+ }
+
+ private void ensureReferenceInstantiated() {
+ if (reference == null) {
+ reference = InstantiationUtil.instantiate(type, Writable.class);
+ }
+ }
+
+ private void ensureTempReferenceInstantiated() {
+ if (tempReference == null) {
+ tempReference = InstantiationUtil.instantiate(type, Writable.class);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
new file mode 100644
index 0000000..9036d75
--- /dev/null
+++ b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+
+import com.esotericsoftware.kryo.Kryo;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.objenesis.strategy.StdInstantiatorStrategy;
+
+import java.io.IOException;
+
+public class WritableSerializer<T extends Writable> extends TypeSerializer<T> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final Class<T> typeClass;
+
+ private transient Kryo kryo;
+
+ private transient T copyInstance;
+
+ public WritableSerializer(Class<T> typeClass) {
+ this.typeClass = typeClass;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public T createInstance() {
+ if(typeClass == NullWritable.class) {
+ return (T) NullWritable.get();
+ }
+ return InstantiationUtil.instantiate(typeClass);
+ }
+
+
+
+ @Override
+ public T copy(T from) {
+ checkKryoInitialized();
+
+ return KryoUtils.copy(from, kryo, this);
+ }
+
+ @Override
+ public T copy(T from, T reuse) {
+ checkKryoInitialized();
+
+ return KryoUtils.copy(from, reuse, kryo, this);
+ }
+
+ @Override
+ public int getLength() {
+ return -1;
+ }
+
+ @Override
+ public void serialize(T record, DataOutputView target) throws IOException {
+ record.write(target);
+ }
+
+ @Override
+ public T deserialize(DataInputView source) throws IOException {
+ return deserialize(createInstance(), source);
+ }
+
+ @Override
+ public T deserialize(T reuse, DataInputView source) throws IOException {
+ reuse.readFields(source);
+ return reuse;
+ }
+
+ @Override
+ public void copy(DataInputView source, DataOutputView target) throws IOException {
+ ensureInstanceInstantiated();
+ copyInstance.readFields(source);
+ copyInstance.write(target);
+ }
+
+ @Override
+ public boolean isImmutableType() {
+ return false;
+ }
+
+ @Override
+ public WritableSerializer<T> duplicate() {
+ return new WritableSerializer<T>(typeClass);
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ private void ensureInstanceInstantiated() {
+ if (copyInstance == null) {
+ copyInstance = createInstance();
+ }
+ }
+
+ private void checkKryoInitialized() {
+ if (this.kryo == null) {
+ this.kryo = new Kryo();
+
+ Kryo.DefaultInstantiatorStrategy instantiatorStrategy = new Kryo.DefaultInstantiatorStrategy();
+ instantiatorStrategy.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy());
+ kryo.setInstantiatorStrategy(instantiatorStrategy);
+
+ this.kryo.setAsmEnabled(true);
+ this.kryo.register(typeClass);
+ }
+ }
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public int hashCode() {
+ return this.typeClass.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof WritableSerializer) {
+ WritableSerializer<?> other = (WritableSerializer<?>) obj;
+
+ return other.canEqual(this) && typeClass == other.typeClass;
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public boolean canEqual(Object obj) {
+ return obj instanceof WritableSerializer;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopInputs.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopInputs.java b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopInputs.java
new file mode 100644
index 0000000..9e8a3e4
--- /dev/null
+++ b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopInputs.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.hadoopcompatibility;
+
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
+
+import java.io.IOException;
+
+/**
+ * HadoopInputs is a utility class to use Apache Hadoop InputFormats with Apache Flink.
+ *
+ * It provides methods to create Flink InputFormat wrappers for Hadoop {@link org.apache.hadoop.mapred.InputFormat}
+ * and {@link org.apache.hadoop.mapreduce.InputFormat}.
+ *
+ * Key value pairs produced by the Hadoop InputFormats are converted into Flink
+ * {@link org.apache.flink.api.java.tuple.Tuple2 Tuple2} objects where the first field
+ * ({@link org.apache.flink.api.java.tuple.Tuple2#f0 Tuple2.f0}) is the key and the second field
+ * ({@link org.apache.flink.api.java.tuple.Tuple2#f1 Tuple2.f1}) is the value.
+ *
+ */
+
+public final class HadoopInputs {
+ // ----------------------------------- Hadoop Input Format ---------------------------------------
+
+ /**
+ * Creates a Flink {@link InputFormat} that wraps the given Hadoop {@link org.apache.hadoop.mapred.FileInputFormat}.
+ *
+ * @return A Flink InputFormat that wraps the Hadoop FileInputFormat.
+ */
+ public static <K,V> HadoopInputFormat<K, V> readHadoopFile(org.apache.hadoop.mapred.FileInputFormat<K,V> mapredInputFormat, Class<K> key, Class<V> value, String inputPath, JobConf job) {
+ // set input path in JobConf
+ org.apache.hadoop.mapred.FileInputFormat.addInputPath(job, new org.apache.hadoop.fs.Path(inputPath));
+ // return wrapping InputFormat
+ return createHadoopInput(mapredInputFormat, key, value, job);
+ }
+
+ /**
+ * Creates a Flink {@link InputFormat} that wraps the given Hadoop {@link org.apache.hadoop.mapred.FileInputFormat}.
+ *
+ * @return A Flink InputFormat that wraps the Hadoop FileInputFormat.
+ */
+ public static <K,V> HadoopInputFormat<K, V> readHadoopFile(org.apache.hadoop.mapred.FileInputFormat<K,V> mapredInputFormat, Class<K> key, Class<V> value, String inputPath) {
+ return readHadoopFile(mapredInputFormat, key, value, inputPath, new JobConf());
+ }
+
+ /**
+ * Creates a Flink {@link InputFormat} to read a Hadoop sequence file for the given key and value classes.
+ *
+ * @return A Flink InputFormat that wraps a Hadoop SequenceFileInputFormat.
+ */
+ public static <K,V> HadoopInputFormat<K, V> readSequenceFile(Class<K> key, Class<V> value, String inputPath) throws IOException {
+ return readHadoopFile(new org.apache.hadoop.mapred.SequenceFileInputFormat<K, V>(), key, value, inputPath);
+ }
+
+ /**
+ * Creates a Flink {@link InputFormat} that wraps the given Hadoop {@link org.apache.hadoop.mapred.InputFormat}.
+ *
+ * @return A Flink InputFormat that wraps the Hadoop InputFormat.
+ */
+ public static <K,V> HadoopInputFormat<K, V> createHadoopInput(org.apache.hadoop.mapred.InputFormat<K,V> mapredInputFormat, Class<K> key, Class<V> value, JobConf job) {
+ return new HadoopInputFormat<>(mapredInputFormat, key, value, job);
+ }
+
+ /**
+ * Creates a Flink {@link InputFormat} that wraps the given Hadoop {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat}.
+ *
+ * @return A Flink InputFormat that wraps the Hadoop FileInputFormat.
+ */
+ public static <K,V> org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> readHadoopFile(
+ org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K,V> mapreduceInputFormat, Class<K> key, Class<V> value, String inputPath, Job job) throws IOException
+ {
+ // set input path in Job
+ org.apache.hadoop.mapreduce.lib.input.FileInputFormat.addInputPath(job, new org.apache.hadoop.fs.Path(inputPath));
+ // return wrapping InputFormat
+ return createHadoopInput(mapreduceInputFormat, key, value, job);
+ }
+
+ /**
+ * Creates a Flink {@link InputFormat} that wraps the given Hadoop {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat}.
+ *
+ * @return A Flink InputFormat that wraps the Hadoop FileInputFormat.
+ */
+ public static <K,V> org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> readHadoopFile(
+ org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K,V> mapreduceInputFormat, Class<K> key, Class<V> value, String inputPath) throws IOException
+ {
+ return readHadoopFile(mapreduceInputFormat, key, value, inputPath, Job.getInstance());
+ }
+
+ /**
+ * Creates a Flink {@link InputFormat} that wraps the given Hadoop {@link org.apache.hadoop.mapreduce.InputFormat}.
+ *
+ * @return A Flink InputFormat that wraps the Hadoop InputFormat.
+ */
+ public static <K,V> org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> createHadoopInput(
+ org.apache.hadoop.mapreduce.InputFormat<K,V> mapreduceInputFormat, Class<K> key, Class<V> value, Job job)
+ {
+ return new org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<>(mapreduceInputFormat, key, value, job);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopUtils.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopUtils.java b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopUtils.java
new file mode 100644
index 0000000..97ca329
--- /dev/null
+++ b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopUtils.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.hadoopcompatibility;
+
+import org.apache.commons.cli.Option;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.hadoop.util.GenericOptionsParser;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Utility class to work with Apache Hadoop libraries.
+ */
+public class HadoopUtils {
+ /**
+ * Returns {@link ParameterTool} for the arguments parsed by {@link GenericOptionsParser}
+ *
+ * @param args Input array arguments. It should be parsable by {@link GenericOptionsParser}
+ * @return A {@link ParameterTool}
+ * @throws IOException If arguments cannot be parsed by {@link GenericOptionsParser}
+ * @see GenericOptionsParser
+ */
+ public static ParameterTool paramsFromGenericOptionsParser(String[] args) throws IOException {
+ Option[] options = new GenericOptionsParser(args).getCommandLine().getOptions();
+ Map<String, String> map = new HashMap<String, String>();
+ for (Option option : options) {
+ String[] split = option.getValue().split("=");
+ map.put(split[0], split[1]);
+ }
+ return ParameterTool.fromMap(map);
+ }
+}
+
+
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopMapFunction.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopMapFunction.java b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopMapFunction.java
new file mode 100644
index 0000000..ba8aa90
--- /dev/null
+++ b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopMapFunction.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.hadoopcompatibility.mapred;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+import org.apache.flink.annotation.Public;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter;
+import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopOutputCollector;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.Reporter;
+
+/**
+ * This wrapper maps a Hadoop Mapper (mapred API) to a Flink FlatMapFunction.
+ */
+@SuppressWarnings("rawtypes")
+@Public
+public final class HadoopMapFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
+ extends RichFlatMapFunction<Tuple2<KEYIN,VALUEIN>, Tuple2<KEYOUT,VALUEOUT>>
+ implements ResultTypeQueryable<Tuple2<KEYOUT,VALUEOUT>>, Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private transient Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT> mapper;
+ private transient JobConf jobConf;
+
+ private transient HadoopOutputCollector<KEYOUT,VALUEOUT> outputCollector;
+ private transient Reporter reporter;
+
+ /**
+ * Maps a Hadoop Mapper (mapred API) to a Flink FlatMapFunction.
+ *
+ * @param hadoopMapper The Hadoop Mapper to wrap.
+ */
+ public HadoopMapFunction(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopMapper) {
+ this(hadoopMapper, new JobConf());
+ }
+
+ /**
+ * Maps a Hadoop Mapper (mapred API) to a Flink FlatMapFunction.
+ * The Hadoop Mapper is configured with the provided JobConf.
+ *
+ * @param hadoopMapper The Hadoop Mapper to wrap.
+ * @param conf The JobConf that is used to configure the Hadoop Mapper.
+ */
+ public HadoopMapFunction(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopMapper, JobConf conf) {
+ if(hadoopMapper == null) {
+ throw new NullPointerException("Mapper may not be null.");
+ }
+ if(conf == null) {
+ throw new NullPointerException("JobConf may not be null.");
+ }
+
+ this.mapper = hadoopMapper;
+ this.jobConf = conf;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ this.mapper.configure(jobConf);
+
+ this.reporter = new HadoopDummyReporter();
+ this.outputCollector = new HadoopOutputCollector<KEYOUT, VALUEOUT>();
+ }
+
+ @Override
+ public void flatMap(final Tuple2<KEYIN,VALUEIN> value, final Collector<Tuple2<KEYOUT,VALUEOUT>> out)
+ throws Exception {
+ outputCollector.setFlinkCollector(out);
+ mapper.map(value.f0, value.f1, outputCollector, reporter);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public TypeInformation<Tuple2<KEYOUT,VALUEOUT>> getProducedType() {
+ Class<KEYOUT> outKeyClass = (Class<KEYOUT>) TypeExtractor.getParameterType(Mapper.class, mapper.getClass(), 2);
+ Class<VALUEOUT> outValClass = (Class<VALUEOUT>)TypeExtractor.getParameterType(Mapper.class, mapper.getClass(), 3);
+
+ final TypeInformation<KEYOUT> keyTypeInfo = TypeExtractor.getForClass((Class<KEYOUT>) outKeyClass);
+ final TypeInformation<VALUEOUT> valueTypleInfo = TypeExtractor.getForClass((Class<VALUEOUT>) outValClass);
+ return new TupleTypeInfo<Tuple2<KEYOUT,VALUEOUT>>(keyTypeInfo, valueTypleInfo);
+ }
+
+ /**
+ * Custom serialization methods.
+ * @see <a href="http://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html">http://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html</a>
+ */
+ private void writeObject(final ObjectOutputStream out) throws IOException {
+ out.writeObject(mapper.getClass());
+ jobConf.write(out);
+ }
+
+ @SuppressWarnings("unchecked")
+ private void readObject(final ObjectInputStream in) throws IOException, ClassNotFoundException {
+ Class<Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT>> mapperClass =
+ (Class<Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT>>)in.readObject();
+ mapper = InstantiationUtil.instantiate(mapperClass);
+
+ jobConf = new JobConf();
+ jobConf.readFields(in);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java
new file mode 100644
index 0000000..c1acc2b
--- /dev/null
+++ b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.hadoopcompatibility.mapred;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+import org.apache.flink.annotation.Public;
+import org.apache.flink.api.common.functions.GroupCombineFunction;
+import org.apache.flink.api.common.functions.RichGroupReduceFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter;
+import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopOutputCollector;
+import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopTupleUnwrappingIterator;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+
+/**
+ * This wrapper maps a Hadoop Reducer and Combiner (mapred API) to a combinable Flink GroupReduceFunction.
+ */
+@SuppressWarnings("rawtypes")
+@Public
+public final class HadoopReduceCombineFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
+ extends RichGroupReduceFunction<Tuple2<KEYIN,VALUEIN>,Tuple2<KEYOUT,VALUEOUT>>
+ implements GroupCombineFunction<Tuple2<KEYIN,VALUEIN>, Tuple2<KEYIN,VALUEIN>>,
+ ResultTypeQueryable<Tuple2<KEYOUT,VALUEOUT>>, Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private transient Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> reducer;
+ private transient Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN> combiner;
+ private transient JobConf jobConf;
+
+ private transient HadoopTupleUnwrappingIterator<KEYIN, VALUEIN> valueIterator;
+ private transient HadoopOutputCollector<KEYOUT,VALUEOUT> reduceCollector;
+ private transient HadoopOutputCollector<KEYIN,VALUEIN> combineCollector;
+ private transient Reporter reporter;
+
+ /**
+ * Maps two Hadoop Reducer (mapred API) to a combinable Flink GroupReduceFunction.
+ *
+ * @param hadoopReducer The Hadoop Reducer that is mapped to a GroupReduceFunction.
+ * @param hadoopCombiner The Hadoop Reducer that is mapped to the combiner function.
+ */
+ public HadoopReduceCombineFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer,
+ Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN> hadoopCombiner) {
+ this(hadoopReducer, hadoopCombiner, new JobConf());
+ }
+
+ /**
+ * Maps two Hadoop Reducer (mapred API) to a combinable Flink GroupReduceFunction.
+ *
+ * @param hadoopReducer The Hadoop Reducer that is mapped to a GroupReduceFunction.
+ * @param hadoopCombiner The Hadoop Reducer that is mapped to the combiner function.
+ * @param conf The JobConf that is used to configure both Hadoop Reducers.
+ */
+ public HadoopReduceCombineFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer,
+ Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN> hadoopCombiner, JobConf conf) {
+ if(hadoopReducer == null) {
+ throw new NullPointerException("Reducer may not be null.");
+ }
+ if(hadoopCombiner == null) {
+ throw new NullPointerException("Combiner may not be null.");
+ }
+ if(conf == null) {
+ throw new NullPointerException("JobConf may not be null.");
+ }
+
+ this.reducer = hadoopReducer;
+ this.combiner = hadoopCombiner;
+ this.jobConf = conf;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ this.reducer.configure(jobConf);
+ this.combiner.configure(jobConf);
+
+ this.reporter = new HadoopDummyReporter();
+ Class<KEYIN> inKeyClass = (Class<KEYIN>) TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 0);
+ TypeSerializer<KEYIN> keySerializer = TypeExtractor.getForClass(inKeyClass).createSerializer(getRuntimeContext().getExecutionConfig());
+ this.valueIterator = new HadoopTupleUnwrappingIterator<>(keySerializer);
+ this.combineCollector = new HadoopOutputCollector<>();
+ this.reduceCollector = new HadoopOutputCollector<>();
+ }
+
+ @Override
+ public void reduce(final Iterable<Tuple2<KEYIN,VALUEIN>> values, final Collector<Tuple2<KEYOUT,VALUEOUT>> out)
+ throws Exception {
+ reduceCollector.setFlinkCollector(out);
+ valueIterator.set(values.iterator());
+ reducer.reduce(valueIterator.getCurrentKey(), valueIterator, reduceCollector, reporter);
+ }
+
+ @Override
+ public void combine(final Iterable<Tuple2<KEYIN,VALUEIN>> values, final Collector<Tuple2<KEYIN,VALUEIN>> out) throws Exception {
+ combineCollector.setFlinkCollector(out);
+ valueIterator.set(values.iterator());
+ combiner.reduce(valueIterator.getCurrentKey(), valueIterator, combineCollector, reporter);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public TypeInformation<Tuple2<KEYOUT,VALUEOUT>> getProducedType() {
+ Class<KEYOUT> outKeyClass = (Class<KEYOUT>) TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 2);
+ Class<VALUEOUT> outValClass = (Class<VALUEOUT>)TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 3);
+
+ final TypeInformation<KEYOUT> keyTypeInfo = TypeExtractor.getForClass(outKeyClass);
+ final TypeInformation<VALUEOUT> valueTypleInfo = TypeExtractor.getForClass(outValClass);
+ return new TupleTypeInfo<>(keyTypeInfo, valueTypleInfo);
+ }
+
+ /**
+ * Custom serialization methods.
+ * @see <a href="http://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html">http://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html</a>
+ */
+ private void writeObject(final ObjectOutputStream out) throws IOException {
+
+ out.writeObject(reducer.getClass());
+ out.writeObject(combiner.getClass());
+ jobConf.write(out);
+ }
+
+ @SuppressWarnings("unchecked")
+ private void readObject(final ObjectInputStream in) throws IOException, ClassNotFoundException {
+
+ Class<Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>> reducerClass =
+ (Class<Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>>)in.readObject();
+ reducer = InstantiationUtil.instantiate(reducerClass);
+
+ Class<Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN>> combinerClass =
+ (Class<Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN>>)in.readObject();
+ combiner = InstantiationUtil.instantiate(combinerClass);
+
+ jobConf = new JobConf();
+ jobConf.readFields(in);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceFunction.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceFunction.java b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceFunction.java
new file mode 100644
index 0000000..55aea24
--- /dev/null
+++ b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceFunction.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.hadoopcompatibility.mapred;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+import org.apache.flink.annotation.Public;
+import org.apache.flink.api.common.functions.RichGroupReduceFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter;
+import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopOutputCollector;
+import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopTupleUnwrappingIterator;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+
+/**
+ * This wrapper maps a Hadoop Reducer (mapred API) to a non-combinable Flink GroupReduceFunction.
+ */
+@SuppressWarnings("rawtypes")
+@Public
+public final class HadoopReduceFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
+ extends RichGroupReduceFunction<Tuple2<KEYIN,VALUEIN>,Tuple2<KEYOUT,VALUEOUT>>
+ implements ResultTypeQueryable<Tuple2<KEYOUT,VALUEOUT>>, Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private transient Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> reducer;
+ private transient JobConf jobConf;
+
+ private transient HadoopTupleUnwrappingIterator<KEYIN, VALUEIN> valueIterator;
+ private transient HadoopOutputCollector<KEYOUT,VALUEOUT> reduceCollector;
+ private transient Reporter reporter;
+
+ /**
+ * Maps a Hadoop Reducer (mapred API) to a non-combinable Flink GroupReduceFunction.
+ *
+ * @param hadoopReducer The Hadoop Reducer to wrap.
+ */
+ public HadoopReduceFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer) {
+ this(hadoopReducer, new JobConf());
+ }
+
+ /**
+ * Maps a Hadoop Reducer (mapred API) to a non-combinable Flink GroupReduceFunction.
+ *
+ * @param hadoopReducer The Hadoop Reducer to wrap.
+ * @param conf The JobConf that is used to configure the Hadoop Reducer.
+ */
+ public HadoopReduceFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer, JobConf conf) {
+ if(hadoopReducer == null) {
+ throw new NullPointerException("Reducer may not be null.");
+ }
+ if(conf == null) {
+ throw new NullPointerException("JobConf may not be null.");
+ }
+
+ this.reducer = hadoopReducer;
+ this.jobConf = conf;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ this.reducer.configure(jobConf);
+
+ this.reporter = new HadoopDummyReporter();
+ this.reduceCollector = new HadoopOutputCollector<KEYOUT, VALUEOUT>();
+ Class<KEYIN> inKeyClass = (Class<KEYIN>) TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 0);
+ TypeSerializer<KEYIN> keySerializer = TypeExtractor.getForClass(inKeyClass).createSerializer(getRuntimeContext().getExecutionConfig());
+ this.valueIterator = new HadoopTupleUnwrappingIterator<KEYIN, VALUEIN>(keySerializer);
+ }
+
+ @Override
+ public void reduce(final Iterable<Tuple2<KEYIN,VALUEIN>> values, final Collector<Tuple2<KEYOUT,VALUEOUT>> out)
+ throws Exception {
+
+ reduceCollector.setFlinkCollector(out);
+ valueIterator.set(values.iterator());
+ reducer.reduce(valueIterator.getCurrentKey(), valueIterator, reduceCollector, reporter);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public TypeInformation<Tuple2<KEYOUT,VALUEOUT>> getProducedType() {
+ Class<KEYOUT> outKeyClass = (Class<KEYOUT>) TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 2);
+ Class<VALUEOUT> outValClass = (Class<VALUEOUT>)TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 3);
+
+ final TypeInformation<KEYOUT> keyTypeInfo = TypeExtractor.getForClass((Class<KEYOUT>) outKeyClass);
+ final TypeInformation<VALUEOUT> valueTypleInfo = TypeExtractor.getForClass((Class<VALUEOUT>) outValClass);
+ return new TupleTypeInfo<Tuple2<KEYOUT,VALUEOUT>>(keyTypeInfo, valueTypleInfo);
+ }
+
+ /**
+ * Custom serialization methods
+ * @see <a href="http://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html">http://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html</a>
+ */
+ private void writeObject(final ObjectOutputStream out) throws IOException {
+
+ out.writeObject(reducer.getClass());
+ jobConf.write(out);
+ }
+
+ @SuppressWarnings("unchecked")
+ private void readObject(final ObjectInputStream in) throws IOException, ClassNotFoundException {
+
+ Class<Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>> reducerClass =
+ (Class<Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>>)in.readObject();
+ reducer = InstantiationUtil.instantiate(reducerClass);
+
+ jobConf = new JobConf();
+ jobConf.readFields(in);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopOutputCollector.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopOutputCollector.java b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopOutputCollector.java
new file mode 100644
index 0000000..bfe03d3
--- /dev/null
+++ b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopOutputCollector.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.hadoopcompatibility.mapred.wrapper;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.Collector;
+import org.apache.hadoop.mapred.OutputCollector;
+
+import java.io.IOException;
+
+/**
+ * A Hadoop OutputCollector that wraps a Flink OutputCollector.
+ * On each call of collect() the data is forwarded to the wrapped Flink collector.
+ */
+public final class HadoopOutputCollector<KEY,VALUE> implements OutputCollector<KEY,VALUE> {
+
+ private Collector<Tuple2<KEY,VALUE>> flinkCollector;
+
+ private final Tuple2<KEY,VALUE> outTuple = new Tuple2<KEY, VALUE>();
+
+ /**
+ * Set the wrapped Flink collector.
+ *
+ * @param flinkCollector The wrapped Flink OutputCollector.
+ */
+ public void setFlinkCollector(Collector<Tuple2<KEY, VALUE>> flinkCollector) {
+ this.flinkCollector = flinkCollector;
+ }
+
+ /**
+ * Use the wrapped Flink collector to collect a key-value pair for Flink.
+ *
+ * @param key the key to collect
+ * @param val the value to collect
+ * @throws IOException unexpected of key or value in key-value pair.
+ */
+ @Override
+ public void collect(final KEY key, final VALUE val) throws IOException {
+ this.outTuple.f0 = key;
+ this.outTuple.f1 = val;
+ this.flinkCollector.collect(outTuple);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIterator.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIterator.java b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIterator.java
new file mode 100644
index 0000000..2d204b8
--- /dev/null
+++ b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIterator.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.hadoopcompatibility.mapred.wrapper;
+
+import java.util.Iterator;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.operators.translation.TupleUnwrappingIterator;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Wraps a Flink Tuple2 (key-value-pair) iterator into an iterator over the second (value) field.
+ */
+public class HadoopTupleUnwrappingIterator<KEY,VALUE>
+ extends TupleUnwrappingIterator<VALUE, KEY> implements java.io.Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private final TypeSerializer<KEY> keySerializer;
+
+ private transient Iterator<Tuple2<KEY,VALUE>> iterator;
+
+ private transient KEY curKey;
+ private transient VALUE firstValue;
+ private transient boolean atFirst;
+
+ public HadoopTupleUnwrappingIterator(TypeSerializer<KEY> keySerializer) {
+ this.keySerializer = checkNotNull(keySerializer);
+ }
+
+ /**
+ * Set the Flink iterator to wrap.
+ *
+ * @param iterator The Flink iterator to wrap.
+ */
+ @Override
+ public void set(final Iterator<Tuple2<KEY,VALUE>> iterator) {
+ this.iterator = iterator;
+ if(this.hasNext()) {
+ final Tuple2<KEY, VALUE> tuple = iterator.next();
+ this.curKey = keySerializer.copy(tuple.f0);
+ this.firstValue = tuple.f1;
+ this.atFirst = true;
+ } else {
+ this.atFirst = false;
+ }
+ }
+
+ @Override
+ public boolean hasNext() {
+ if(this.atFirst) {
+ return true;
+ }
+ return iterator.hasNext();
+ }
+
+ @Override
+ public VALUE next() {
+ if(this.atFirst) {
+ this.atFirst = false;
+ return firstValue;
+ }
+
+ final Tuple2<KEY, VALUE> tuple = iterator.next();
+ return tuple.f1;
+ }
+
+ public KEY getCurrentKey() {
+ return this.curKey;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hadoop-compatibility/src/main/scala/org/apache/flink/hadoopcompatibility/scala/HadoopInputs.scala
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/main/scala/org/apache/flink/hadoopcompatibility/scala/HadoopInputs.scala b/flink-connectors/flink-hadoop-compatibility/src/main/scala/org/apache/flink/hadoopcompatibility/scala/HadoopInputs.scala
new file mode 100644
index 0000000..133a5f4
--- /dev/null
+++ b/flink-connectors/flink-hadoop-compatibility/src/main/scala/org/apache/flink/hadoopcompatibility/scala/HadoopInputs.scala
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.hadoopcompatibility.scala
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala.hadoop.mapreduce
+import org.apache.flink.api.scala.hadoop.mapred
+import org.apache.hadoop.fs.{Path => HadoopPath}
+import org.apache.hadoop.mapred.{JobConf, FileInputFormat => MapredFileInputFormat, InputFormat => MapredInputFormat}
+import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => MapreduceFileInputFormat}
+import org.apache.hadoop.mapreduce.{Job, InputFormat => MapreduceInputFormat}
+
+/**
+ * HadoopInputs is a utility class to use Apache Hadoop InputFormats with Apache Flink.
+ *
+ * It provides methods to create Flink InputFormat wrappers for Hadoop
+ * [[org.apache.hadoop.mapred.InputFormat]] and [[org.apache.hadoop.mapreduce.InputFormat]].
+ *
+ * Key value pairs produced by the Hadoop InputFormats are converted into [[Tuple2]] where
+ * the first field is the key and the second field is the value.
+ *
+ */
+object HadoopInputs {
+
+ /**
+ * Creates a Flink [[org.apache.flink.api.common.io.InputFormat]] that wraps the given Hadoop
+ * [[org.apache.hadoop.mapred.FileInputFormat]].
+ */
+ def readHadoopFile[K, V](
+ mapredInputFormat: MapredFileInputFormat[K, V],
+ key: Class[K],
+ value: Class[V],
+ inputPath: String,
+ job: JobConf)(implicit tpe: TypeInformation[(K, V)]): mapred.HadoopInputFormat[K, V] = {
+
+ // set input path in JobConf
+ MapredFileInputFormat.addInputPath(job, new HadoopPath(inputPath))
+ // wrap mapredInputFormat
+ createHadoopInput(mapredInputFormat, key, value, job)
+ }
+
+ /**
+ * Creates a Flink [[org.apache.flink.api.common.io.InputFormat]] that wraps the given Hadoop
+ * [[org.apache.hadoop.mapred.FileInputFormat]].
+ */
+ def readHadoopFile[K, V](
+ mapredInputFormat: MapredFileInputFormat[K, V],
+ key: Class[K],
+ value: Class[V],
+ inputPath: String)(implicit tpe: TypeInformation[(K, V)]): mapred.HadoopInputFormat[K, V] = {
+
+ readHadoopFile(mapredInputFormat, key, value, inputPath, new JobConf)
+ }
+
+ /**
+ * Creates a Flink [[org.apache.flink.api.common.io.InputFormat]] that reads a Hadoop sequence
+ * file with the given key and value classes.
+ */
+ def readSequenceFile[K, V](
+ key: Class[K],
+ value: Class[V],
+ inputPath: String)(implicit tpe: TypeInformation[(K, V)]): mapred.HadoopInputFormat[K, V] = {
+
+ readHadoopFile(
+ new org.apache.hadoop.mapred.SequenceFileInputFormat[K, V],
+ key,
+ value,
+ inputPath
+ )
+ }
+
+ /**
+ * Creates a Flink [[org.apache.flink.api.common.io.InputFormat]] that wraps the given Hadoop
+ * [[org.apache.hadoop.mapred.InputFormat]].
+ */
+ def createHadoopInput[K, V](
+ mapredInputFormat: MapredInputFormat[K, V],
+ key: Class[K],
+ value: Class[V],
+ job: JobConf)(implicit tpe: TypeInformation[(K, V)]): mapred.HadoopInputFormat[K, V] = {
+
+ new mapred.HadoopInputFormat[K, V](mapredInputFormat, key, value, job)
+ }
+
+ /**
+ * Creates a Flink [[org.apache.flink.api.common.io.InputFormat]] that wraps the given Hadoop
+ * [[org.apache.hadoop.mapreduce.lib.input.FileInputFormat]].
+ */
+ def readHadoopFile[K, V](
+ mapreduceInputFormat: MapreduceFileInputFormat[K, V],
+ key: Class[K],
+ value: Class[V],
+ inputPath: String,
+ job: Job)(implicit tpe: TypeInformation[(K, V)]): mapreduce.HadoopInputFormat[K, V] = {
+
+ // set input path in Job
+ MapreduceFileInputFormat.addInputPath(job, new HadoopPath(inputPath))
+ // wrap mapreduceInputFormat
+ createHadoopInput(mapreduceInputFormat, key, value, job)
+ }
+
+ /**
+ * Creates a Flink [[org.apache.flink.api.common.io.InputFormat]] that wraps the given Hadoop
+ * [[org.apache.hadoop.mapreduce.lib.input.FileInputFormat]].
+ */
+ def readHadoopFile[K, V](
+ mapreduceInputFormat: MapreduceFileInputFormat[K, V],
+ key: Class[K],
+ value: Class[V],
+ inputPath: String)(implicit tpe: TypeInformation[(K, V)]): mapreduce.HadoopInputFormat[K, V] =
+ {
+ readHadoopFile(mapreduceInputFormat, key, value, inputPath, Job.getInstance)
+ }
+
+ /**
+ * Creates a Flink [[org.apache.flink.api.common.io.InputFormat]] that wraps the given Hadoop
+ * [[org.apache.hadoop.mapreduce.InputFormat]].
+ */
+ def createHadoopInput[K, V](
+ mapreduceInputFormat: MapreduceInputFormat[K, V],
+ key: Class[K],
+ value: Class[V],
+ job: Job)(implicit tpe: TypeInformation[(K, V)]): mapreduce.HadoopInputFormat[K, V] = {
+
+ new mapreduce.HadoopInputFormat[K, V](mapreduceInputFormat, key, value, job)
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableExtractionTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableExtractionTest.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableExtractionTest.java
new file mode 100644
index 0000000..2aefd9f
--- /dev/null
+++ b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableExtractionTest.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparator;
+
+import org.junit.Test;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+
+import static org.junit.Assert.*;
+
+@SuppressWarnings("serial")
+public class WritableExtractionTest {
+
+ @Test
+ public void testDetectWritable() {
+ // writable interface itself must not be writable
+ assertFalse(TypeExtractor.isHadoopWritable(Writable.class));
+
+ // various forms of extension
+ assertTrue(TypeExtractor.isHadoopWritable(DirectWritable.class));
+ assertTrue(TypeExtractor.isHadoopWritable(ViaInterfaceExtension.class));
+ assertTrue(TypeExtractor.isHadoopWritable(ViaAbstractClassExtension.class));
+
+ // some non-writables
+ assertFalse(TypeExtractor.isHadoopWritable(String.class));
+ assertFalse(TypeExtractor.isHadoopWritable(List.class));
+ assertFalse(TypeExtractor.isHadoopWritable(WritableComparator.class));
+ }
+
+ @Test
+ public void testCreateWritableInfo() {
+ TypeInformation<DirectWritable> info1 =
+ TypeExtractor.createHadoopWritableTypeInfo(DirectWritable.class);
+ assertEquals(DirectWritable.class, info1.getTypeClass());
+
+ TypeInformation<ViaInterfaceExtension> info2 =
+ TypeExtractor.createHadoopWritableTypeInfo(ViaInterfaceExtension.class);
+ assertEquals(ViaInterfaceExtension.class, info2.getTypeClass());
+
+ TypeInformation<ViaAbstractClassExtension> info3 =
+ TypeExtractor.createHadoopWritableTypeInfo(ViaAbstractClassExtension.class);
+ assertEquals(ViaAbstractClassExtension.class, info3.getTypeClass());
+ }
+
+ @Test
+ public void testValidateTypeInfo() {
+ // validate unrelated type info
+ TypeExtractor.validateIfWritable(BasicTypeInfo.STRING_TYPE_INFO, String.class);
+
+ // validate writable type info correctly
+ TypeExtractor.validateIfWritable(new WritableTypeInfo<>(
+ DirectWritable.class), DirectWritable.class);
+ TypeExtractor.validateIfWritable(new WritableTypeInfo<>(
+ ViaInterfaceExtension.class), ViaInterfaceExtension.class);
+ TypeExtractor.validateIfWritable(new WritableTypeInfo<>(
+ ViaAbstractClassExtension.class), ViaAbstractClassExtension.class);
+
+ // incorrect case: not writable at all
+ try {
+ TypeExtractor.validateIfWritable(new WritableTypeInfo<>(
+ DirectWritable.class), String.class);
+ fail("should have failed with an exception");
+ } catch (InvalidTypesException e) {
+ // expected
+ }
+
+ // incorrect case: wrong writable
+ try {
+ TypeExtractor.validateIfWritable(new WritableTypeInfo<>(
+ ViaInterfaceExtension.class), DirectWritable.class);
+ fail("should have failed with an exception");
+ } catch (InvalidTypesException e) {
+ // expected
+ }
+ }
+
+ @Test
+ public void testExtractFromFunction() {
+ RichMapFunction<DirectWritable, DirectWritable> function = new RichMapFunction<DirectWritable, DirectWritable>() {
+ @Override
+ public DirectWritable map(DirectWritable value) throws Exception {
+ return null;
+ }
+ };
+
+ TypeInformation<DirectWritable> outType =
+ TypeExtractor.getMapReturnTypes(function, new WritableTypeInfo<>(DirectWritable.class));
+
+ assertTrue(outType instanceof WritableTypeInfo);
+ assertEquals(DirectWritable.class, outType.getTypeClass());
+ }
+
+ @Test
+ public void testExtractAsPartOfPojo() {
+ PojoTypeInfo<PojoWithWritable> pojoInfo =
+ (PojoTypeInfo<PojoWithWritable>) TypeExtractor.getForClass(PojoWithWritable.class);
+
+ boolean foundWritable = false;
+ for (int i = 0; i < pojoInfo.getArity(); i++) {
+ PojoField field = pojoInfo.getPojoFieldAt(i);
+ String name = field.getField().getName();
+
+ if (name.equals("hadoopCitizen")) {
+ if (foundWritable) {
+ fail("already seen");
+ }
+ foundWritable = true;
+ assertEquals(new WritableTypeInfo<>(DirectWritable.class), field.getTypeInformation());
+ assertEquals(DirectWritable.class, field.getTypeInformation().getTypeClass());
+
+ }
+ }
+
+ assertTrue("missed the writable type", foundWritable);
+ }
+
+ @Test
+ public void testInputValidationError() {
+
+ RichMapFunction<Writable, String> function = new RichMapFunction<Writable, String>() {
+ @Override
+ public String map(Writable value) throws Exception {
+ return null;
+ }
+ };
+
+ @SuppressWarnings("unchecked")
+ TypeInformation<Writable> inType =
+ (TypeInformation<Writable>) (TypeInformation<?>) new WritableTypeInfo<>(DirectWritable.class);
+
+ try {
+ TypeExtractor.getMapReturnTypes(function, inType);
+ fail("exception expected");
+ }
+ catch (InvalidTypesException e) {
+ // right
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // test type classes
+ // ------------------------------------------------------------------------
+
+ public interface ExtendedWritable extends Writable {}
+
+ public static abstract class AbstractWritable implements Writable {}
+
+ public static class DirectWritable implements Writable {
+
+ @Override
+ public void write(DataOutput dataOutput) throws IOException {}
+
+ @Override
+ public void readFields(DataInput dataInput) throws IOException {}
+ }
+
+ public static class ViaInterfaceExtension implements ExtendedWritable {
+
+ @Override
+ public void write(DataOutput dataOutput) throws IOException {}
+
+ @Override
+ public void readFields(DataInput dataInput) throws IOException {}
+ }
+
+ public static class ViaAbstractClassExtension extends AbstractWritable {
+
+ @Override
+ public void write(DataOutput dataOutput) throws IOException {}
+
+ @Override
+ public void readFields(DataInput dataInput) throws IOException {}
+ }
+
+ public static class PojoWithWritable {
+ public String str;
+ public DirectWritable hadoopCitizen;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableInfoParserTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableInfoParserTest.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableInfoParserTest.java
new file mode 100644
index 0000000..3d2b652
--- /dev/null
+++ b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableInfoParserTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.hadoop.io.Writable;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+public class WritableInfoParserTest {
+
+ @Test
+ public void testWritableType() {
+ TypeInformation<?> ti = TypeInfoParser.parse(
+ "Writable<org.apache.flink.api.java.typeutils.WritableInfoParserTest$MyWritable>");
+
+ Assert.assertTrue(ti instanceof WritableTypeInfo<?>);
+ Assert.assertEquals(MyWritable.class, ((WritableTypeInfo<?>) ti).getTypeClass());
+ }
+
+ @Test
+ public void testPojoWithWritableType() {
+ TypeInformation<?> ti = TypeInfoParser.parse(
+ "org.apache.flink.api.java.typeutils.WritableInfoParserTest$MyPojo<"
+ + "basic=Integer,"
+ + "tuple=Tuple2<String, Integer>,"
+ + "hadoopCitizen=Writable<org.apache.flink.api.java.typeutils.WritableInfoParserTest$MyWritable>,"
+ + "array=String[]"
+ + ">");
+ Assert.assertTrue(ti instanceof PojoTypeInfo);
+ PojoTypeInfo<?> pti = (PojoTypeInfo<?>) ti;
+ Assert.assertEquals("array", pti.getPojoFieldAt(0).getField().getName());
+ Assert.assertTrue(pti.getPojoFieldAt(0).getTypeInformation() instanceof BasicArrayTypeInfo);
+ Assert.assertEquals("basic", pti.getPojoFieldAt(1).getField().getName());
+ Assert.assertTrue(pti.getPojoFieldAt(1).getTypeInformation() instanceof BasicTypeInfo);
+ Assert.assertEquals("hadoopCitizen", pti.getPojoFieldAt(2).getField().getName());
+ Assert.assertTrue(pti.getPojoFieldAt(2).getTypeInformation() instanceof WritableTypeInfo);
+ Assert.assertEquals("tuple", pti.getPojoFieldAt(3).getField().getName());
+ Assert.assertTrue(pti.getPojoFieldAt(3).getTypeInformation() instanceof TupleTypeInfo);
+ }
+ // ------------------------------------------------------------------------
+ // Test types
+ // ------------------------------------------------------------------------
+
+ public static class MyWritable implements Writable {
+
+ @Override
+ public void write(DataOutput out) throws IOException {}
+
+ @Override
+ public void readFields(DataInput in) throws IOException {}
+ }
+
+ public static class MyPojo {
+ public Integer basic;
+ public Tuple2<String, Integer> tuple;
+ public MyWritable hadoopCitizen;
+ public String[] array;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableTypeInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableTypeInfoTest.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableTypeInfoTest.java
new file mode 100644
index 0000000..eb9cdf0
--- /dev/null
+++ b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableTypeInfoTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.util.TestLogger;
+import org.apache.hadoop.io.Writable;
+import org.junit.Test;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+public class WritableTypeInfoTest extends TestLogger {
+
+ @Test
+ public void testWritableTypeInfoEquality() {
+ WritableTypeInfo<TestClass> tpeInfo1 = new WritableTypeInfo<>(TestClass.class);
+ WritableTypeInfo<TestClass> tpeInfo2 = new WritableTypeInfo<>(TestClass.class);
+
+ assertEquals(tpeInfo1, tpeInfo2);
+ assertEquals(tpeInfo1.hashCode(), tpeInfo2.hashCode());
+ }
+
+ @Test
+ public void testWritableTypeInfoInequality() {
+ WritableTypeInfo<TestClass> tpeInfo1 = new WritableTypeInfo<>(TestClass.class);
+ WritableTypeInfo<AlternateClass> tpeInfo2 = new WritableTypeInfo<>(AlternateClass.class);
+
+ assertNotEquals(tpeInfo1, tpeInfo2);
+ }
+
+ // ------------------------------------------------------------------------
+ // test types
+ // ------------------------------------------------------------------------
+
+ public static class TestClass implements Writable {
+
+ @Override
+ public void write(DataOutput dataOutput) throws IOException {}
+
+ @Override
+ public void readFields(DataInput dataInput) throws IOException {}
+ }
+
+ public static class AlternateClass implements Writable {
+
+ @Override
+ public void write(DataOutput dataOutput) throws IOException {}
+
+ @Override
+ public void readFields(DataInput dataInput) throws IOException {}
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/StringArrayWritable.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/StringArrayWritable.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/StringArrayWritable.java
new file mode 100644
index 0000000..c32f5da
--- /dev/null
+++ b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/StringArrayWritable.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.hadoop.io.Writable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+public class StringArrayWritable implements Writable, Comparable<StringArrayWritable> {
+
+ private String[] array = new String[0];
+
+ public StringArrayWritable() {
+ super();
+ }
+
+ public StringArrayWritable(String[] array) {
+ this.array = array;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(this.array.length);
+
+ for(String str : this.array) {
+ byte[] b = str.getBytes();
+ out.writeInt(b.length);
+ out.write(b);
+ }
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ this.array = new String[in.readInt()];
+
+ for(int i = 0; i < this.array.length; i++) {
+ byte[] b = new byte[in.readInt()];
+ in.readFully(b);
+ this.array[i] = new String(b);
+ }
+ }
+
+ @Override
+ public int compareTo(StringArrayWritable o) {
+ if(this.array.length != o.array.length) {
+ return this.array.length - o.array.length;
+ }
+
+ for(int i = 0; i < this.array.length; i++) {
+ int comp = this.array[i].compareTo(o.array[i]);
+ if(comp != 0) {
+ return comp;
+ }
+ }
+ return 0;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if(!(obj instanceof StringArrayWritable)) {
+ return false;
+ }
+ return this.compareTo((StringArrayWritable) obj) == 0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorTest.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorTest.java
new file mode 100644
index 0000000..96f844c
--- /dev/null
+++ b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.typeutils.ComparatorTestBase;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+public class WritableComparatorTest extends ComparatorTestBase<StringArrayWritable> {
+
+ StringArrayWritable[] data = new StringArrayWritable[]{
+ new StringArrayWritable(new String[]{}),
+ new StringArrayWritable(new String[]{""}),
+ new StringArrayWritable(new String[]{"a","a"}),
+ new StringArrayWritable(new String[]{"a","b"}),
+ new StringArrayWritable(new String[]{"c","c"}),
+ new StringArrayWritable(new String[]{"d","f"}),
+ new StringArrayWritable(new String[]{"d","m"}),
+ new StringArrayWritable(new String[]{"z","x"}),
+ new StringArrayWritable(new String[]{"a","a", "a"})
+ };
+
+ @Override
+ protected TypeComparator<StringArrayWritable> createComparator(boolean ascending) {
+ return new WritableComparator<StringArrayWritable>(ascending, StringArrayWritable.class);
+ }
+
+ @Override
+ protected TypeSerializer<StringArrayWritable> createSerializer() {
+ return new WritableSerializer<StringArrayWritable>(StringArrayWritable.class);
+ }
+
+ @Override
+ protected StringArrayWritable[] getSortedTestData() {
+ return data;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorUUIDTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorUUIDTest.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorUUIDTest.java
new file mode 100644
index 0000000..94e759d
--- /dev/null
+++ b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorUUIDTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.typeutils.ComparatorTestBase;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+import java.util.UUID;
+
+public class WritableComparatorUUIDTest extends ComparatorTestBase<WritableID> {
+ @Override
+ protected TypeComparator<WritableID> createComparator(boolean ascending) {
+ return new WritableComparator<>(ascending, WritableID.class);
+ }
+
+ @Override
+ protected TypeSerializer<WritableID> createSerializer() {
+ return new WritableSerializer<>(WritableID.class);
+ }
+
+ @Override
+ protected WritableID[] getSortedTestData() {
+ return new WritableID[] {
+ new WritableID(new UUID(0, 0)),
+ new WritableID(new UUID(1, 0)),
+ new WritableID(new UUID(1, 1))
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableID.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableID.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableID.java
new file mode 100644
index 0000000..4274cf6
--- /dev/null
+++ b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableID.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.hadoop.io.WritableComparable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.UUID;
+
+public class WritableID implements WritableComparable<WritableID> {
+ private UUID uuid;
+
+ public WritableID() {
+ this.uuid = UUID.randomUUID();
+ }
+
+ public WritableID(UUID uuid) {
+ this.uuid = uuid;
+ }
+
+ @Override
+ public int compareTo(WritableID o) {
+ return this.uuid.compareTo(o.uuid);
+ }
+
+ @Override
+ public void write(DataOutput dataOutput) throws IOException {
+ dataOutput.writeLong(uuid.getMostSignificantBits());
+ dataOutput.writeLong(uuid.getLeastSignificantBits());
+ }
+
+ @Override
+ public void readFields(DataInput dataInput) throws IOException {
+ this.uuid = new UUID(dataInput.readLong(), dataInput.readLong());
+ }
+
+ @Override
+ public String toString() {
+ return uuid.toString();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ WritableID id = (WritableID) o;
+
+ return !(uuid != null ? !uuid.equals(id.uuid) : id.uuid != null);
+ }
+
+ @Override
+ public int hashCode() {
+ return uuid != null ? uuid.hashCode() : 0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerTest.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerTest.java
new file mode 100644
index 0000000..bb5f4d4
--- /dev/null
+++ b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.SerializerTestInstance;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.WritableTypeInfo;
+import org.junit.Test;
+
+public class WritableSerializerTest {
+
+ @Test
+ public void testStringArrayWritable() {
+ StringArrayWritable[] data = new StringArrayWritable[]{
+ new StringArrayWritable(new String[]{}),
+ new StringArrayWritable(new String[]{""}),
+ new StringArrayWritable(new String[]{"a","a"}),
+ new StringArrayWritable(new String[]{"a","b"}),
+ new StringArrayWritable(new String[]{"c","c"}),
+ new StringArrayWritable(new String[]{"d","f"}),
+ new StringArrayWritable(new String[]{"d","m"}),
+ new StringArrayWritable(new String[]{"z","x"}),
+ new StringArrayWritable(new String[]{"a","a", "a"})
+ };
+
+ WritableTypeInfo<StringArrayWritable> writableTypeInfo = (WritableTypeInfo<StringArrayWritable>) TypeExtractor.getForObject(data[0]);
+ WritableSerializer<StringArrayWritable> writableSerializer = (WritableSerializer<StringArrayWritable>) writableTypeInfo.createSerializer(new ExecutionConfig());
+
+ SerializerTestInstance<StringArrayWritable> testInstance = new SerializerTestInstance<StringArrayWritable>(writableSerializer,writableTypeInfo.getTypeClass(), -1, data);
+
+ testInstance.testAll();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerUUIDTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerUUIDTest.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerUUIDTest.java
new file mode 100644
index 0000000..2af7730
--- /dev/null
+++ b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerUUIDTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+import java.util.UUID;
+
+public class WritableSerializerUUIDTest extends SerializerTestBase<WritableID> {
+ @Override
+ protected TypeSerializer<WritableID> createSerializer() {
+ return new WritableSerializer<>(WritableID.class);
+ }
+
+ @Override
+ protected int getLength() {
+ return -1;
+ }
+
+ @Override
+ protected Class<WritableID> getTypeClass() {
+ return WritableID.class;
+ }
+
+ @Override
+ protected WritableID[] getTestData() {
+ return new WritableID[] {
+ new WritableID(new UUID(0, 0)),
+ new WritableID(new UUID(1, 0)),
+ new WritableID(new UUID(1, 1))
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/hadoopcompatibility/HadoopUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/hadoopcompatibility/HadoopUtilsTest.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/hadoopcompatibility/HadoopUtilsTest.java
new file mode 100644
index 0000000..6f7673b
--- /dev/null
+++ b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/hadoopcompatibility/HadoopUtilsTest.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.hadoopcompatibility;
+
+import org.apache.flink.api.java.utils.AbstractParameterToolTest;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.junit.Test;
+
+import java.io.IOException;
+
+public class HadoopUtilsTest extends AbstractParameterToolTest {
+
+ @Test
+ public void testParamsFromGenericOptionsParser() throws IOException {
+ ParameterTool parameter = HadoopUtils.paramsFromGenericOptionsParser(new String[]{"-D", "input=myInput", "-DexpectedCount=15"});
+ validate(parameter);
+ }
+}